/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/http_utils.py

  • Committer: Jelmer Vernooij
  • Date: 2019-06-02 02:35:46 UTC
  • mfrom: (7309 work)
  • mto: This revision was merged to the branch mainline in revision 7319.
  • Revision ID: jelmer@jelmer.uk-20190602023546-lqco868tnv26d8ow
merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
91
91
        # we have to stop early due to error, but we would also have to use the
92
92
        # HTTP trailer facility which may not be widely available.
93
93
        request_bytes = self.rfile.read(data_length)
94
 
        protocol_factory, unused_bytes = medium._get_protocol_factory_for_bytes(
95
 
            request_bytes)
 
94
        protocol_factory, unused_bytes = (
 
95
            medium._get_protocol_factory_for_bytes(request_bytes))
96
96
        out_buffer = BytesIO()
97
97
        smart_protocol_request = protocol_factory(t, out_buffer.write, '/')
98
98
        # Perhaps there should be a SmartServerHTTPMedium that takes care of
138
138
    We set up two webservers to allows various tests involving
139
139
    proxies or redirections from one server to the other.
140
140
    """
 
141
 
141
142
    def setUp(self):
142
143
        super(TestCaseWithTwoWebservers, self).setUp()
143
144
        self.transport_secondary_server = http_server.HttpServer
192
193
                # We do not send a body
193
194
                self.send_header('Content-Length', '0')
194
195
                self.end_headers()
195
 
                return False # The job is done
 
196
                return False  # The job is done
196
197
            else:
197
198
                # We leave the parent class serve the request
198
199
                pass
231
232
        code = None
232
233
        target = None
233
234
        for (rsource, rtarget, rcode) in self.redirections:
234
 
            target, match = re.subn(rsource, rtarget, path)
 
235
            target, match = re.subn(rsource, rtarget, path, count=1)
235
236
            if match:
236
237
                code = rcode
237
 
                break # The first match wins
 
238
                break  # The first match wins
238
239
            else:
239
240
                target = None
240
241
        return code, target
241
242
 
242
243
 
243
244
class TestCaseWithRedirectedWebserver(TestCaseWithTwoWebservers):
244
 
   """A support class providing redirections from one server to another.
245
 
 
246
 
   We set up two webservers to allows various tests involving
247
 
   redirections.
248
 
   The 'old' server is redirected to the 'new' server.
249
 
   """
250
 
 
251
 
   def setUp(self):
252
 
       super(TestCaseWithRedirectedWebserver, self).setUp()
253
 
       # The redirections will point to the new server
254
 
       self.new_server = self.get_readonly_server()
255
 
       # The requests to the old server will be redirected to the new server
256
 
       self.old_server = self.get_secondary_server()
257
 
 
258
 
   def create_transport_secondary_server(self):
259
 
       """Create the secondary server redirecting to the primary server"""
260
 
       new = self.get_readonly_server()
261
 
       redirecting = HTTPServerRedirecting(
262
 
           protocol_version=self._protocol_version)
263
 
       redirecting.redirect_to(new.host, new.port)
264
 
       redirecting._url_protocol = self._url_protocol
265
 
       return redirecting
266
 
 
267
 
   def get_old_url(self, relpath=None):
 
245
    """A support class providing redirections from one server to another.
 
246
 
 
247
    We set up two webservers to allows various tests involving
 
248
    redirections.
 
249
    The 'old' server is redirected to the 'new' server.
 
250
    """
 
251
 
 
252
    def setUp(self):
 
253
        super(TestCaseWithRedirectedWebserver, self).setUp()
 
254
        # The redirections will point to the new server
 
255
        self.new_server = self.get_readonly_server()
 
256
        # The requests to the old server will be redirected to the new server
 
257
        self.old_server = self.get_secondary_server()
 
258
 
 
259
    def create_transport_secondary_server(self):
 
260
        """Create the secondary server redirecting to the primary server"""
 
261
        new = self.get_readonly_server()
 
262
        redirecting = HTTPServerRedirecting(
 
263
            protocol_version=self._protocol_version)
 
264
        redirecting.redirect_to(new.host, new.port)
 
265
        redirecting._url_protocol = self._url_protocol
 
266
        return redirecting
 
267
 
 
268
    def get_old_url(self, relpath=None):
268
269
        base = self.old_server.get_url()
269
270
        return self._adjust_url(base, relpath)
270
271
 
271
 
   def get_old_transport(self, relpath=None):
 
272
    def get_old_transport(self, relpath=None):
272
273
        t = transport.get_transport_from_url(self.get_old_url(relpath))
273
274
        self.assertTrue(t.is_readonly())
274
275
        return t
275
276
 
276
 
   def get_new_url(self, relpath=None):
 
277
    def get_new_url(self, relpath=None):
277
278
        base = self.new_server.get_url()
278
279
        return self._adjust_url(base, relpath)
279
280
 
280
 
   def get_new_transport(self, relpath=None):
 
281
    def get_new_transport(self, relpath=None):
281
282
        t = transport.get_transport_from_url(self.get_new_url(relpath))
282
283
        self.assertTrue(t.is_readonly())
283
284
        return t
461
462
        user = auth['username']
462
463
        if user not in self.password_of:
463
464
            return False
464
 
        algorithm= auth['algorithm']
 
465
        algorithm = auth['algorithm']
465
466
        if algorithm != 'MD5':
466
467
            return False
467
468
        qop = auth['qop']
475
476
        A1 = ('%s:%s:%s' % (user, realm, password)).encode('utf-8')
476
477
        A2 = ('%s:%s' % (command, auth['uri'])).encode('utf-8')
477
478
 
478
 
        H = lambda x: osutils.md5(x).hexdigest()
479
 
        KD = lambda secret, data: H(("%s:%s" % (secret, data)).encode('utf-8'))
 
479
        def H(x):
 
480
            return osutils.md5(x).hexdigest()
 
481
 
 
482
        def KD(secret, data):
 
483
            return H(("%s:%s" % (secret, data)).encode('utf-8'))
480
484
 
481
485
        nonce_count = int(auth['nc'], 16)
482
486
 
566
570
        self.init_proxy_auth()
567
571
        # We really accept Digest only
568
572
        self.auth_scheme = 'digest'
569
 
 
570