/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/http_utils.py

  • Committer: Jelmer Vernooij
  • Date: 2018-07-08 14:45:27 UTC
  • mto: This revision was merged to the branch mainline in revision 7036.
  • Revision ID: jelmer@jelmer.uk-20180708144527-codhlvdcdg9y0nji
Fix a bunch of merge tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
from cStringIO import StringIO
18
 
import errno
19
17
import re
20
 
import socket
21
 
import threading
22
 
import time
23
 
import urllib2
24
 
import urlparse
25
 
 
26
 
 
27
 
from bzrlib import (
 
18
try:
 
19
    from urllib.request import (
 
20
        parse_http_list,
 
21
        parse_keqv_list,
 
22
        )
 
23
except ImportError:  # python < 3
 
24
    from urllib2 import (
 
25
        parse_http_list,
 
26
        parse_keqv_list,
 
27
        )
 
28
 
 
29
 
 
30
from .. import (
28
31
    errors,
29
32
    osutils,
30
33
    tests,
31
 
    )
32
 
from bzrlib.smart import medium, protocol
33
 
from bzrlib.tests import http_server
34
 
from bzrlib.transport import (
35
 
    chroot,
36
 
    get_transport,
37
 
    )
 
34
    transport,
 
35
    )
 
36
from ..sixish import (
 
37
    BytesIO,
 
38
    )
 
39
from ..bzr.smart import (
 
40
    medium,
 
41
    )
 
42
from . import http_server
 
43
from ..transport import chroot
38
44
 
39
45
 
40
46
class HTTPServerWithSmarts(http_server.HttpServer):
51
57
class SmartRequestHandler(http_server.TestingHTTPRequestHandler):
52
58
    """Extend TestingHTTPRequestHandler to support smart client POSTs.
53
59
 
54
 
    XXX: This duplicates a fair bit of the logic in bzrlib.transport.http.wsgi.
 
60
    XXX: This duplicates a fair bit of the logic in breezy.transport.http.wsgi.
55
61
    """
56
62
 
57
63
    def do_POST(self):
58
64
        """Hand the request off to a smart server instance."""
59
 
        backing = get_transport(self.server.test_case_server._home_dir)
 
65
        backing = transport.get_transport_from_path(
 
66
            self.server.test_case_server._home_dir)
60
67
        chroot_server = chroot.ChrootServer(backing)
61
68
        chroot_server.start_server()
62
69
        try:
63
 
            t = get_transport(chroot_server.get_url())
 
70
            t = transport.get_transport_from_url(chroot_server.get_url())
64
71
            self.do_POST_inner(t)
65
72
        finally:
66
73
            chroot_server.stop_server()
85
92
        request_bytes = self.rfile.read(data_length)
86
93
        protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
87
94
            request_bytes)
88
 
        out_buffer = StringIO()
 
95
        out_buffer = BytesIO()
89
96
        smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
90
97
        # Perhaps there should be a SmartServerHTTPMedium that takes care of
91
98
        # feeding the bytes in the http request to the smart_protocol_request,
106
113
    one. This will currently fail if the primary transport is not
107
114
    backed by regular disk files.
108
115
    """
 
116
 
 
117
    # These attributes can be overriden or parametrized by daughter clasess if
 
118
    # needed, but must exist so that the create_transport_readonly_server()
 
119
    # method (or any method creating an http(s) server) can propagate it.
 
120
    _protocol_version = None
 
121
    _url_protocol = 'http'
 
122
 
109
123
    def setUp(self):
110
124
        super(TestCaseWithWebserver, self).setUp()
111
125
        self.transport_readonly_server = http_server.HttpServer
112
126
 
 
127
    def create_transport_readonly_server(self):
 
128
        server = self.transport_readonly_server(
 
129
            protocol_version=self._protocol_version)
 
130
        server._url_protocol = self._url_protocol
 
131
        return server
 
132
 
113
133
 
114
134
class TestCaseWithTwoWebservers(TestCaseWithWebserver):
115
135
    """A support class providing readonly urls on two servers that are http://.
127
147
 
128
148
        This is mostly a hook for daughter classes.
129
149
        """
130
 
        return self.transport_secondary_server()
 
150
        server = self.transport_secondary_server(
 
151
            protocol_version=self._protocol_version)
 
152
        server._url_protocol = self._url_protocol
 
153
        return server
131
154
 
132
155
    def get_secondary_server(self):
133
156
        """Get the server instance for the secondary transport."""
136
159
            self.start_server(self.__secondary_server)
137
160
        return self.__secondary_server
138
161
 
 
162
    def get_secondary_url(self, relpath=None):
 
163
        base = self.get_secondary_server().get_url()
 
164
        return self._adjust_url(base, relpath)
 
165
 
 
166
    def get_secondary_transport(self, relpath=None):
 
167
        t = transport.get_transport_from_url(self.get_secondary_url(relpath))
 
168
        self.assertTrue(t.is_readonly())
 
169
        return t
 
170
 
139
171
 
140
172
class ProxyServer(http_server.HttpServer):
141
173
    """A proxy test server for http transports."""
184
216
    def redirect_to(self, host, port):
185
217
        """Redirect all requests to a specific host:port"""
186
218
        self.redirections = [('(.*)',
187
 
                              r'http://%s:%s\1' % (host, port) ,
 
219
                              r'http://%s:%s\1' % (host, port),
188
220
                              301)]
189
221
 
190
222
    def is_redirected(self, path):
215
247
   The 'old' server is redirected to the 'new' server.
216
248
   """
217
249
 
 
250
   def setUp(self):
 
251
       super(TestCaseWithRedirectedWebserver, self).setUp()
 
252
       # The redirections will point to the new server
 
253
       self.new_server = self.get_readonly_server()
 
254
       # The requests to the old server will be redirected to the new server
 
255
       self.old_server = self.get_secondary_server()
 
256
 
218
257
   def create_transport_secondary_server(self):
219
258
       """Create the secondary server redirecting to the primary server"""
220
259
       new = self.get_readonly_server()
221
 
       redirecting = HTTPServerRedirecting()
 
260
       redirecting = HTTPServerRedirecting(
 
261
           protocol_version=self._protocol_version)
222
262
       redirecting.redirect_to(new.host, new.port)
 
263
       redirecting._url_protocol = self._url_protocol
223
264
       return redirecting
224
265
 
225
 
   def setUp(self):
226
 
       super(TestCaseWithRedirectedWebserver, self).setUp()
227
 
       # The redirections will point to the new server
228
 
       self.new_server = self.get_readonly_server()
229
 
       # The requests to the old server will be redirected
230
 
       self.old_server = self.get_secondary_server()
 
266
   def get_old_url(self, relpath=None):
 
267
        base = self.old_server.get_url()
 
268
        return self._adjust_url(base, relpath)
 
269
 
 
270
   def get_old_transport(self, relpath=None):
 
271
        t = transport.get_transport_from_url(self.get_old_url(relpath))
 
272
        self.assertTrue(t.is_readonly())
 
273
        return t
 
274
 
 
275
   def get_new_url(self, relpath=None):
 
276
        base = self.new_server.get_url()
 
277
        return self._adjust_url(base, relpath)
 
278
 
 
279
   def get_new_transport(self, relpath=None):
 
280
        t = transport.get_transport_from_url(self.get_new_url(relpath))
 
281
        self.assertTrue(t.is_readonly())
 
282
        return t
231
283
 
232
284
 
233
285
class AuthRequestHandler(http_server.TestingHTTPRequestHandler):
243
295
    # - auth_header_recv: the header received containing auth
244
296
    # - auth_error_code: the error code to indicate auth required
245
297
 
 
298
    def _require_authentication(self):
 
299
        # Note that we must update test_case_server *before*
 
300
        # sending the error or the client may try to read it
 
301
        # before we have sent the whole error back.
 
302
        tcs = self.server.test_case_server
 
303
        tcs.auth_required_errors += 1
 
304
        self.send_response(tcs.auth_error_code)
 
305
        self.send_header_auth_reqed()
 
306
        # We do not send a body
 
307
        self.send_header('Content-Length', '0')
 
308
        self.end_headers()
 
309
        return
 
310
 
246
311
    def do_GET(self):
247
312
        if self.authorized():
248
313
            return http_server.TestingHTTPRequestHandler.do_GET(self)
249
314
        else:
250
 
            # Note that we must update test_case_server *before*
251
 
            # sending the error or the client may try to read it
252
 
            # before we have sent the whole error back.
253
 
            tcs = self.server.test_case_server
254
 
            tcs.auth_required_errors += 1
255
 
            self.send_response(tcs.auth_error_code)
256
 
            self.send_header_auth_reqed()
257
 
            # We do not send a body
258
 
            self.send_header('Content-Length', '0')
259
 
            self.end_headers()
260
 
            return
 
315
            return self._require_authentication()
 
316
 
 
317
    def do_HEAD(self):
 
318
        if self.authorized():
 
319
            return http_server.TestingHTTPRequestHandler.do_HEAD(self)
 
320
        else:
 
321
            return self._require_authentication()
261
322
 
262
323
 
263
324
class BasicAuthRequestHandler(AuthRequestHandler):
302
363
            return False
303
364
        scheme, auth = auth_header.split(None, 1)
304
365
        if scheme.lower() == tcs.auth_scheme:
305
 
            auth_dict = urllib2.parse_keqv_list(urllib2.parse_http_list(auth))
 
366
            auth_dict = parse_keqv_list(parse_http_list(auth))
306
367
 
307
368
            return tcs.digest_authorized(auth_dict, self.command)
308
369
 
313
374
        header = 'Digest realm="%s", ' % tcs.auth_realm
314
375
        header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
315
376
                                                              'MD5')
316
 
        self.send_header(tcs.auth_header_sent,header)
 
377
        self.send_header(tcs.auth_header_sent, header)
317
378
 
318
379
 
319
380
class DigestAndBasicAuthRequestHandler(DigestAuthRequestHandler):
331
392
        header = 'Digest realm="%s", ' % tcs.auth_realm
332
393
        header += 'nonce="%s", algorithm="%s", qop="auth"' % (tcs.auth_nonce,
333
394
                                                              'MD5')
334
 
        self.send_header(tcs.auth_header_sent,header)
 
395
        self.send_header(tcs.auth_header_sent, header)
335
396
 
336
397
 
337
398
class AuthServer(http_server.HttpServer):
396
457
        if realm != self.auth_realm:
397
458
            return False
398
459
        user = auth['username']
399
 
        if not self.password_of.has_key(user):
 
460
        if user not in self.password_of:
400
461
            return False
401
462
        algorithm= auth['algorithm']
402
463
        if algorithm != 'MD5':