/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/http_utils.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-06 02:13:25 UTC
  • mfrom: (7490.7.21 work)
  • mto: This revision was merged to the branch mainline in revision 7501.
  • Revision ID: jelmer@jelmer.uk-20200506021325-awbmmqu1zyorz7sj
Merge 3.1 branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
 
17
import base64
 
18
from io import BytesIO
17
19
import re
18
 
try:
19
 
    from urllib.request import (
20
 
        parse_http_list,
21
 
        parse_keqv_list,
22
 
        )
23
 
except ImportError:  # python < 3
24
 
    from urllib2 import (
25
 
        parse_http_list,
26
 
        parse_keqv_list,
27
 
        )
 
20
from urllib.request import (
 
21
    parse_http_list,
 
22
    parse_keqv_list,
 
23
    )
28
24
 
29
25
 
30
26
from .. import (
33
29
    tests,
34
30
    transport,
35
31
    )
36
 
from ..sixish import (
37
 
    BytesIO,
38
 
    )
39
32
from ..bzr.smart import (
40
33
    medium,
41
34
    )
90
83
        # we have to stop early due to error, but we would also have to use the
91
84
        # HTTP trailer facility which may not be widely available.
92
85
        request_bytes = self.rfile.read(data_length)
93
 
        protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
94
 
            request_bytes)
 
86
        protocol_factory, unused_bytes = (
 
87
            medium._get_protocol_factory_for_bytes(request_bytes))
95
88
        out_buffer = BytesIO()
96
89
        smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
97
90
        # Perhaps there should be a SmartServerHTTPMedium that takes care of
137
130
    We set up two webservers to allows various tests involving
138
131
    proxies or redirections from one server to the other.
139
132
    """
 
133
 
140
134
    def setUp(self):
141
135
        super(TestCaseWithTwoWebservers, self).setUp()
142
136
        self.transport_secondary_server = http_server.HttpServer
191
185
                # We do not send a body
192
186
                self.send_header('Content-Length', '0')
193
187
                self.end_headers()
194
 
                return False # The job is done
 
188
                return False  # The job is done
195
189
            else:
196
190
                # We leave the parent class serve the request
197
191
                pass
230
224
        code = None
231
225
        target = None
232
226
        for (rsource, rtarget, rcode) in self.redirections:
233
 
            target, match = re.subn(rsource, rtarget, path)
 
227
            target, match = re.subn(rsource, rtarget, path, count=1)
234
228
            if match:
235
229
                code = rcode
236
 
                break # The first match wins
 
230
                break  # The first match wins
237
231
            else:
238
232
                target = None
239
233
        return code, target
240
234
 
241
235
 
242
236
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
243
 
   """A support class providing redirections from one server to another.
244
 
 
245
 
   We set up two webservers to allows various tests involving
246
 
   redirections.
247
 
   The 'old' server is redirected to the 'new' server.
248
 
   """
249
 
 
250
 
   def setUp(self):
251
 
       super(TestCaseWithRedirectedWebserver, self).setUp()
252
 
       # The redirections will point to the new server
253
 
       self.new_server = self.get_readonly_server()
254
 
       # The requests to the old server will be redirected to the new server
255
 
       self.old_server = self.get_secondary_server()
256
 
 
257
 
   def create_transport_secondary_server(self):
258
 
       """Create the secondary server redirecting to the primary server"""
259
 
       new = self.get_readonly_server()
260
 
       redirecting = HTTPServerRedirecting(
261
 
           protocol_version=self._protocol_version)
262
 
       redirecting.redirect_to(new.host, new.port)
263
 
       redirecting._url_protocol = self._url_protocol
264
 
       return redirecting
265
 
 
266
 
   def get_old_url(self, relpath=None):
 
237
    """A support class providing redirections from one server to another.
 
238
 
 
239
    We set up two webservers to allows various tests involving
 
240
    redirections.
 
241
    The 'old' server is redirected to the 'new' server.
 
242
    """
 
243
 
 
244
    def setUp(self):
 
245
        super(TestCaseWithRedirectedWebserver, self).setUp()
 
246
        # The redirections will point to the new server
 
247
        self.new_server = self.get_readonly_server()
 
248
        # The requests to the old server will be redirected to the new server
 
249
        self.old_server = self.get_secondary_server()
 
250
 
 
251
    def create_transport_secondary_server(self):
 
252
        """Create the secondary server redirecting to the primary server"""
 
253
        new = self.get_readonly_server()
 
254
        redirecting = HTTPServerRedirecting(
 
255
            protocol_version=self._protocol_version)
 
256
        redirecting.redirect_to(new.host, new.port)
 
257
        redirecting._url_protocol = self._url_protocol
 
258
        return redirecting
 
259
 
 
260
    def get_old_url(self, relpath=None):
267
261
        base = self.old_server.get_url()
268
262
        return self._adjust_url(base, relpath)
269
263
 
270
 
   def get_old_transport(self, relpath=None):
 
264
    def get_old_transport(self, relpath=None):
271
265
        t = transport.get_transport_from_url(self.get_old_url(relpath))
272
266
        self.assertTrue(t.is_readonly())
273
267
        return t
274
268
 
275
 
   def get_new_url(self, relpath=None):
 
269
    def get_new_url(self, relpath=None):
276
270
        base = self.new_server.get_url()
277
271
        return self._adjust_url(base, relpath)
278
272
 
279
 
   def get_new_transport(self, relpath=None):
 
273
    def get_new_transport(self, relpath=None):
280
274
        t = transport.get_transport_from_url(self.get_new_url(relpath))
281
275
        self.assertTrue(t.is_readonly())
282
276
        return t
333
327
        if auth_header:
334
328
            scheme, raw_auth = auth_header.split(' ', 1)
335
329
            if scheme.lower() == tcs.auth_scheme:
336
 
                user, password = raw_auth.decode('base64').split(':')
337
 
                return tcs.authorized(user, password)
 
330
                user, password = base64.b64decode(raw_auth).split(b':')
 
331
                return tcs.authorized(user.decode('ascii'),
 
332
                                      password.decode('ascii'))
338
333
 
339
334
        return False
340
335
 
410
405
    auth_header_sent = None
411
406
    auth_header_recv = None
412
407
    auth_error_code = None
413
 
    auth_realm = "Thou should not pass"
 
408
    auth_realm = u"Thou should not pass"
414
409
 
415
410
    def __init__(self, request_handler, auth_scheme,
416
411
                 protocol_version=None):
459
454
        user = auth['username']
460
455
        if user not in self.password_of:
461
456
            return False
462
 
        algorithm= auth['algorithm']
 
457
        algorithm = auth['algorithm']
463
458
        if algorithm != 'MD5':
464
459
            return False
465
460
        qop = auth['qop']
470
465
 
471
466
        # Recalculate the response_digest to compare with the one
472
467
        # sent by the client
473
 
        A1 = '%s:%s:%s' % (user, realm, password)
474
 
        A2 = '%s:%s' % (command, auth['uri'])
475
 
 
476
 
        H = lambda x: osutils.md5(x).hexdigest()
477
 
        KD = lambda secret, data: H("%s:%s" % (secret, data))
 
468
        A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
 
469
        A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
 
470
 
 
471
        def H(x):
 
472
            return osutils.md5(x).hexdigest()
 
473
 
 
474
        def KD(secret, data):
 
475
            return H(("%s:%s" % (secret, data)).encode('utf-8'))
478
476
 
479
477
        nonce_count = int(auth['nc'], 16)
480
478
 
564
562
        self.init_proxy_auth()
565
563
        # We really accept Digest only
566
564
        self.auth_scheme = 'digest'
567
 
 
568