/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

Prepare test framework for pyftpdlib injection.

* bzrlib/tests/ftp_server/__init__.py:
Provide a single front-end for ftp test servers.

* bzrlib/tests/test_ftp_transport.py: 
Use ftp_server import.

* bzrlib/transport/ftp/__init__.py:
(get_test_permutations): Simplified.

* bzrlib/tests/__init__.py:
(FTPServerFeature): Moved to bzrlib/tests/ftp_server/__init__.py.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for HTTP implementations.
18
18
 
48
48
    deprecated_in,
49
49
    )
50
50
from bzrlib.tests import (
51
 
    features,
52
51
    http_server,
53
52
    http_utils,
54
53
    )
62
61
    )
63
62
 
64
63
 
65
 
if features.pycurl.available():
 
64
try:
66
65
    from bzrlib.transport.http._pycurl import PyCurlTransport
 
66
    pycurl_present = True
 
67
except errors.DependencyNotPresent:
 
68
    pycurl_present = False
67
69
 
68
70
 
69
71
def load_tests(standard_tests, module, loader):
70
72
    """Multiply tests for http clients and protocol versions."""
71
73
    result = loader.suiteClass()
 
74
    adapter = tests.TestScenarioApplier()
 
75
    remaining_tests = standard_tests
72
76
 
73
 
    # one for each transport implementation
 
77
    # one for each transport
74
78
    t_tests, remaining_tests = tests.split_suite_by_condition(
75
 
        standard_tests, tests.condition_isinstance((
 
79
        remaining_tests, tests.condition_isinstance((
76
80
                TestHttpTransportRegistration,
77
81
                TestHttpTransportUrls,
78
82
                Test_redirected_to,
82
86
                        _server=http_server.HttpServer_urllib,
83
87
                        _qualified_prefix='http+urllib',)),
84
88
        ]
85
 
    if features.pycurl.available():
 
89
    if pycurl_present:
86
90
        transport_scenarios.append(
87
91
            ('pycurl', dict(_transport=PyCurlTransport,
88
92
                            _server=http_server.HttpServer_PyCurl,
89
93
                            _qualified_prefix='http+pycurl',)))
90
 
    tests.multiply_tests(t_tests, transport_scenarios, result)
 
94
    adapter.scenarios = transport_scenarios
 
95
    tests.adapt_tests(t_tests, adapter, result)
91
96
 
92
 
    # each implementation tested with each HTTP version
 
97
    # multiplied by one for each protocol version
93
98
    tp_tests, remaining_tests = tests.split_suite_by_condition(
94
99
        remaining_tests, tests.condition_isinstance((
95
100
                SmartHTTPTunnellingTest,
107
112
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
108
113
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
109
114
            ]
110
 
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
 
115
    tp_scenarios = tests.multiply_scenarios(adapter.scenarios,
111
116
                                            protocol_scenarios)
112
 
    tests.multiply_tests(tp_tests, tp_scenarios, result)
113
 
 
114
 
    # proxy auth: each auth scheme on all http versions on all implementations.
115
 
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
116
 
        remaining_tests, tests.condition_isinstance((
117
 
                TestProxyAuth,
118
 
                )))
119
 
    proxy_auth_scheme_scenarios = [
120
 
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
121
 
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
122
 
        ('basicdigest',
123
 
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
124
 
        ]
125
 
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
 
                                              proxy_auth_scheme_scenarios)
127
 
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
128
 
 
129
 
    # auth: each auth scheme on all http versions on all implementations.
 
117
    adapter.scenarios = tp_scenarios
 
118
    tests.adapt_tests(tp_tests, adapter, result)
 
119
 
 
120
    # multiplied by one for each authentication scheme
130
121
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
131
122
        remaining_tests, tests.condition_isinstance((
132
123
                TestAuth,
133
124
                )))
134
125
    auth_scheme_scenarios = [
135
 
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
136
 
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
137
 
        ('basicdigest',
138
 
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
 
126
        ('basic', dict(_auth_scheme='basic')),
 
127
        ('digest', dict(_auth_scheme='digest')),
139
128
        ]
140
 
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
141
 
                                             auth_scheme_scenarios)
142
 
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
 
129
    adapter.scenarios = tests.multiply_scenarios(adapter.scenarios,
 
130
                                                 auth_scheme_scenarios)
 
131
    tests.adapt_tests(tpa_tests, adapter, result)
143
132
 
144
 
    # activity: on all http[s] versions on all implementations
145
133
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
146
134
        remaining_tests, tests.condition_isinstance((
147
135
                TestActivity,
148
136
                )))
149
137
    activity_scenarios = [
150
 
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
151
 
                             _transport=_urllib.HttpTransport_urllib,)),
 
138
        ('http', dict(_activity_server=ActivityHTTPServer)),
152
139
        ]
153
140
    if tests.HTTPSServerFeature.available():
154
141
        activity_scenarios.append(
155
 
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
156
 
                                  _transport=_urllib.HttpTransport_urllib,)),)
157
 
    if features.pycurl.available():
158
 
        activity_scenarios.append(
159
 
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
160
 
                                 _transport=PyCurlTransport,)),)
161
 
        if tests.HTTPSServerFeature.available():
162
 
            from bzrlib.tests import (
163
 
                ssl_certs,
164
 
                )
165
 
            # FIXME: Until we have a better way to handle self-signed
166
 
            # certificates (like allowing them in a test specific
167
 
            # authentication.conf for example), we need some specialized pycurl
168
 
            # transport for tests.
169
 
            class HTTPS_pycurl_transport(PyCurlTransport):
170
 
 
171
 
                def __init__(self, base, _from_transport=None):
172
 
                    super(HTTPS_pycurl_transport, self).__init__(
173
 
                        base, _from_transport)
174
 
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
175
 
 
176
 
            activity_scenarios.append(
177
 
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
178
 
                                      _transport=HTTPS_pycurl_transport,)),)
179
 
 
180
 
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
181
 
                                               protocol_scenarios)
182
 
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
 
142
            ('https', dict(_activity_server=ActivityHTTPSServer,)))
 
143
    adapter.scenarios = tests.multiply_scenarios(tp_scenarios,
 
144
                                                 activity_scenarios)
 
145
    tests.adapt_tests(tpact_tests, adapter, result)
183
146
 
184
147
    # No parametrization for the remaining tests
185
148
    result.addTests(remaining_tests)
202
165
    It records the bytes sent to it, and replies with a 200.
203
166
    """
204
167
 
205
 
    def __init__(self, expect_body_tail=None, scheme=''):
 
168
    def __init__(self, expect_body_tail=None):
206
169
        """Constructor.
207
170
 
208
171
        :type expect_body_tail: str
213
176
        self.host = None
214
177
        self.port = None
215
178
        self.received_bytes = ''
216
 
        self.scheme = scheme
217
 
 
218
 
    def get_url(self):
219
 
        return '%s://%s:%s/' % (self.scheme, self.host, self.port)
220
 
 
221
 
    def start_server(self):
 
179
 
 
180
    def setUp(self):
222
181
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
223
182
        self._sock.bind(('127.0.0.1', 0))
224
183
        self.host, self.port = self._sock.getsockname()
247
206
            # The client may have already closed the socket.
248
207
            pass
249
208
 
250
 
    def stop_server(self):
 
209
    def tearDown(self):
251
210
        try:
252
211
            self._sock.close()
253
212
        except socket.error:
259
218
 
260
219
class TestAuthHeader(tests.TestCase):
261
220
 
262
 
    def parse_header(self, header, auth_handler_class=None):
263
 
        if auth_handler_class is None:
264
 
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
265
 
        self.auth_handler =  auth_handler_class()
266
 
        return self.auth_handler._parse_auth_header(header)
 
221
    def parse_header(self, header):
 
222
        ah =  _urllib2_wrappers.AbstractAuthHandler()
 
223
        return ah._parse_auth_header(header)
267
224
 
268
225
    def test_empty_header(self):
269
226
        scheme, remainder = self.parse_header('')
270
 
        self.assertEqual('', scheme)
 
227
        self.assertEquals('', scheme)
271
228
        self.assertIs(None, remainder)
272
229
 
273
230
    def test_negotiate_header(self):
274
231
        scheme, remainder = self.parse_header('Negotiate')
275
 
        self.assertEqual('negotiate', scheme)
 
232
        self.assertEquals('negotiate', scheme)
276
233
        self.assertIs(None, remainder)
277
234
 
278
235
    def test_basic_header(self):
279
236
        scheme, remainder = self.parse_header(
280
237
            'Basic realm="Thou should not pass"')
281
 
        self.assertEqual('basic', scheme)
282
 
        self.assertEqual('realm="Thou should not pass"', remainder)
283
 
 
284
 
    def test_basic_extract_realm(self):
285
 
        scheme, remainder = self.parse_header(
286
 
            'Basic realm="Thou should not pass"',
287
 
            _urllib2_wrappers.BasicAuthHandler)
288
 
        match, realm = self.auth_handler.extract_realm(remainder)
289
 
        self.assertTrue(match is not None)
290
 
        self.assertEqual('Thou should not pass', realm)
 
238
        self.assertEquals('basic', scheme)
 
239
        self.assertEquals('realm="Thou should not pass"', remainder)
291
240
 
292
241
    def test_digest_header(self):
293
242
        scheme, remainder = self.parse_header(
294
243
            'Digest realm="Thou should not pass"')
295
 
        self.assertEqual('digest', scheme)
296
 
        self.assertEqual('realm="Thou should not pass"', remainder)
 
244
        self.assertEquals('digest', scheme)
 
245
        self.assertEquals('realm="Thou should not pass"', remainder)
297
246
 
298
247
 
299
248
class TestHTTPServer(tests.TestCase):
306
255
 
307
256
        server = http_server.HttpServer(BogusRequestHandler)
308
257
        try:
309
 
            self.assertRaises(httplib.UnknownProtocol, server.start_server)
 
258
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
310
259
        except:
311
 
            server.stop_server()
 
260
            server.tearDown()
312
261
            self.fail('HTTP Server creation did not raise UnknownProtocol')
313
262
 
314
263
    def test_force_invalid_protocol(self):
315
264
        server = http_server.HttpServer(protocol_version='HTTP/0.1')
316
265
        try:
317
 
            self.assertRaises(httplib.UnknownProtocol, server.start_server)
 
266
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
318
267
        except:
319
 
            server.stop_server()
 
268
            server.tearDown()
320
269
            self.fail('HTTP Server creation did not raise UnknownProtocol')
321
270
 
322
271
    def test_server_start_and_stop(self):
323
272
        server = http_server.HttpServer()
324
 
        server.start_server()
325
 
        try:
326
 
            self.assertTrue(server._http_running)
327
 
        finally:
328
 
            server.stop_server()
 
273
        server.setUp()
 
274
        self.assertTrue(server._http_running)
 
275
        server.tearDown()
329
276
        self.assertFalse(server._http_running)
330
277
 
331
278
    def test_create_http_server_one_zero(self):
334
281
            protocol_version = 'HTTP/1.0'
335
282
 
336
283
        server = http_server.HttpServer(RequestHandlerOneZero)
337
 
        self.start_server(server)
 
284
        server.setUp()
 
285
        self.addCleanup(server.tearDown)
338
286
        self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
339
287
 
340
288
    def test_create_http_server_one_one(self):
343
291
            protocol_version = 'HTTP/1.1'
344
292
 
345
293
        server = http_server.HttpServer(RequestHandlerOneOne)
346
 
        self.start_server(server)
 
294
        server.setUp()
 
295
        self.addCleanup(server.tearDown)
347
296
        self.assertIsInstance(server._httpd,
348
297
                              http_server.TestingThreadingHTTPServer)
349
298
 
354
303
 
355
304
        server = http_server.HttpServer(RequestHandlerOneZero,
356
305
                                        protocol_version='HTTP/1.1')
357
 
        self.start_server(server)
 
306
        server.setUp()
 
307
        self.addCleanup(server.tearDown)
358
308
        self.assertIsInstance(server._httpd,
359
309
                              http_server.TestingThreadingHTTPServer)
360
310
 
365
315
 
366
316
        server = http_server.HttpServer(RequestHandlerOneOne,
367
317
                                        protocol_version='HTTP/1.0')
368
 
        self.start_server(server)
 
318
        server.setUp()
 
319
        self.addCleanup(server.tearDown)
369
320
        self.assertIsInstance(server._httpd,
370
321
                              http_server.TestingHTTPServer)
371
322
 
374
325
    """Test case to inherit from if pycurl is present"""
375
326
 
376
327
    def _get_pycurl_maybe(self):
377
 
        self.requireFeature(features.pycurl)
378
 
        return PyCurlTransport
 
328
        try:
 
329
            from bzrlib.transport.http._pycurl import PyCurlTransport
 
330
            return PyCurlTransport
 
331
        except errors.DependencyNotPresent:
 
332
            raise tests.TestSkipped('pycurl not present')
379
333
 
380
334
    _transport = property(_get_pycurl_maybe)
381
335
 
388
342
    def test_url_parsing(self):
389
343
        f = FakeManager()
390
344
        url = http.extract_auth('http://example.com', f)
391
 
        self.assertEqual('http://example.com', url)
392
 
        self.assertEqual(0, len(f.credentials))
 
345
        self.assertEquals('http://example.com', url)
 
346
        self.assertEquals(0, len(f.credentials))
393
347
        url = http.extract_auth(
394
 
            'http://user:pass@example.com/bzr/bzr.dev', f)
395
 
        self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
 
        self.assertEqual(1, len(f.credentials))
397
 
        self.assertEqual([None, 'example.com', 'user', 'pass'],
398
 
                         f.credentials[0])
 
348
            'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
 
349
        self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
 
350
        self.assertEquals(1, len(f.credentials))
 
351
        self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
 
352
                          f.credentials[0])
399
353
 
400
354
 
401
355
class TestHttpTransportUrls(tests.TestCase):
428
382
    def test_http_impl_urls(self):
429
383
        """There are servers which ask for particular clients to connect"""
430
384
        server = self._server()
431
 
        server.start_server()
432
385
        try:
 
386
            server.setUp()
433
387
            url = server.get_url()
434
388
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
435
389
        finally:
436
 
            server.stop_server()
 
390
            server.tearDown()
437
391
 
438
392
 
439
393
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
448
402
        https by supplying a fake version_info that do not
449
403
        support it.
450
404
        """
451
 
        self.requireFeature(features.pycurl)
452
 
        # Import the module locally now that we now it's available.
453
 
        pycurl = features.pycurl.module
 
405
        try:
 
406
            import pycurl
 
407
        except ImportError:
 
408
            raise tests.TestSkipped('pycurl not present')
454
409
 
455
 
        self.overrideAttr(pycurl, 'version_info',
456
 
                          # Fake the pycurl version_info This was taken from
457
 
                          # a windows pycurl without SSL (thanks to bialix)
458
 
                          lambda : (2,
459
 
                                    '7.13.2',
460
 
                                    462082,
461
 
                                    'i386-pc-win32',
462
 
                                    2576,
463
 
                                    None,
464
 
                                    0,
465
 
                                    None,
466
 
                                    ('ftp', 'gopher', 'telnet',
467
 
                                     'dict', 'ldap', 'http', 'file'),
468
 
                                    None,
469
 
                                    0,
470
 
                                    None))
471
 
        self.assertRaises(errors.DependencyNotPresent, self._transport,
472
 
                          'https://launchpad.net')
 
410
        version_info_orig = pycurl.version_info
 
411
        try:
 
412
            # Now that we have pycurl imported, we can fake its version_info
 
413
            # This was taken from a windows pycurl without SSL
 
414
            # (thanks to bialix)
 
415
            pycurl.version_info = lambda : (2,
 
416
                                            '7.13.2',
 
417
                                            462082,
 
418
                                            'i386-pc-win32',
 
419
                                            2576,
 
420
                                            None,
 
421
                                            0,
 
422
                                            None,
 
423
                                            ('ftp', 'gopher', 'telnet',
 
424
                                             'dict', 'ldap', 'http', 'file'),
 
425
                                            None,
 
426
                                            0,
 
427
                                            None)
 
428
            self.assertRaises(errors.DependencyNotPresent, self._transport,
 
429
                              'https://launchpad.net')
 
430
        finally:
 
431
            # Restore the right function
 
432
            pycurl.version_info = version_info_orig
473
433
 
474
434
 
475
435
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
534
494
class TestPost(tests.TestCase):
535
495
 
536
496
    def test_post_body_is_received(self):
537
 
        server = RecordingServer(expect_body_tail='end-of-body',
538
 
            scheme=self._qualified_prefix)
539
 
        self.start_server(server)
540
 
        url = server.get_url()
 
497
        server = RecordingServer(expect_body_tail='end-of-body')
 
498
        server.setUp()
 
499
        self.addCleanup(server.tearDown)
 
500
        scheme = self._qualified_prefix
 
501
        url = '%s://%s:%s/' % (scheme, server.host, server.port)
541
502
        http_transport = self._transport(url)
542
503
        code, response = http_transport._post('abc def end-of-body')
543
504
        self.assertTrue(
592
553
                                      protocol_version=self._protocol_version)
593
554
 
594
555
    def _testing_pycurl(self):
595
 
        # TODO: This is duplicated for lots of the classes in this file
596
 
        return (features.pycurl.available()
597
 
                and self._transport == PyCurlTransport)
 
556
        return pycurl_present and self._transport == PyCurlTransport
598
557
 
599
558
 
600
559
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
617
576
        # for details) make no distinction between a closed
618
577
        # socket and badly formatted status line, so we can't
619
578
        # just test for ConnectionError, we have to test
620
 
        # InvalidHttpResponse too. And pycurl may raise ConnectionReset
621
 
        # instead of ConnectionError too.
622
 
        self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
623
 
                            errors.InvalidHttpResponse),
 
579
        # InvalidHttpResponse too.
 
580
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
624
581
                          t.has, 'foo/bar')
625
582
 
626
583
    def test_http_get(self):
627
584
        server = self.get_readonly_server()
628
585
        t = self._transport(server.get_url())
629
 
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
630
 
                           errors.InvalidHttpResponse),
 
586
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
631
587
                          t.get, 'foo/bar')
632
588
 
633
589
 
709
665
    _req_handler_class = BadProtocolRequestHandler
710
666
 
711
667
    def setUp(self):
712
 
        if self._testing_pycurl():
 
668
        if pycurl_present and self._transport == PyCurlTransport:
713
669
            raise tests.TestNotApplicable(
714
670
                "pycurl doesn't check the protocol version")
715
671
        super(TestBadProtocolServer, self).setUp()
759
715
        self.assertEqual(None, server.host)
760
716
        self.assertEqual(None, server.port)
761
717
 
762
 
    def test_setUp_and_stop(self):
 
718
    def test_setUp_and_tearDown(self):
763
719
        server = RecordingServer(expect_body_tail=None)
764
 
        server.start_server()
 
720
        server.setUp()
765
721
        try:
766
722
            self.assertNotEqual(None, server.host)
767
723
            self.assertNotEqual(None, server.port)
768
724
        finally:
769
 
            server.stop_server()
 
725
            server.tearDown()
770
726
        self.assertEqual(None, server.host)
771
727
        self.assertEqual(None, server.port)
772
728
 
773
729
    def test_send_receive_bytes(self):
774
 
        server = RecordingServer(expect_body_tail='c', scheme='http')
775
 
        self.start_server(server)
 
730
        server = RecordingServer(expect_body_tail='c')
 
731
        server.setUp()
 
732
        self.addCleanup(server.tearDown)
776
733
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
777
734
        sock.connect((server.host, server.port))
778
735
        sock.sendall('abc')
1169
1126
        if self._testing_pycurl():
1170
1127
            # Oh my ! pycurl does not check for the port as part of
1171
1128
            # no_proxy :-( So we just test the host part
1172
 
            self.no_proxy_host = self.server.host
 
1129
            self.no_proxy_host = 'localhost'
1173
1130
        else:
1174
1131
            self.no_proxy_host = self.proxy_address
1175
1132
        # The secondary server is the proxy
1178
1135
        self._old_env = {}
1179
1136
 
1180
1137
    def _testing_pycurl(self):
1181
 
        # TODO: This is duplicated for lots of the classes in this file
1182
 
        return (features.pycurl.available()
1183
 
                and self._transport == PyCurlTransport)
 
1138
        return pycurl_present and self._transport == PyCurlTransport
1184
1139
 
1185
1140
    def create_transport_secondary_server(self):
1186
1141
        """Creates an http server that will serve files with
1355
1310
        # Since the tests using this class will replace
1356
1311
        # _urllib2_wrappers.Request, we can't just call the base class __init__
1357
1312
        # or we'll loop.
1358
 
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
 
1313
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
1359
1314
        self.follow_redirections = True
1360
1315
 
1361
1316
 
1362
 
def install_redirected_request(test):
1363
 
    test.overrideAttr(_urllib2_wrappers, 'Request', RedirectedRequest)
1364
 
 
1365
 
 
1366
1317
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
1367
1318
    """Test redirections.
1368
1319
 
1378
1329
    """
1379
1330
 
1380
1331
    def setUp(self):
1381
 
        if (features.pycurl.available()
1382
 
            and self._transport == PyCurlTransport):
 
1332
        if pycurl_present and self._transport == PyCurlTransport:
1383
1333
            raise tests.TestNotApplicable(
1384
1334
                "pycurl doesn't redirect silently annymore")
1385
1335
        super(TestHTTPSilentRedirections, self).setUp()
1386
 
        install_redirected_request(self)
 
1336
        self.setup_redirected_request()
 
1337
        self.addCleanup(self.cleanup_redirected_request)
1387
1338
        self.build_tree_contents([('a','a'),
1388
1339
                                  ('1/',),
1389
1340
                                  ('1/a', 'redirected once'),
1399
1350
 
1400
1351
        self.old_transport = self._transport(self.old_server.get_url())
1401
1352
 
 
1353
    def setup_redirected_request(self):
 
1354
        self.original_class = _urllib2_wrappers.Request
 
1355
        _urllib2_wrappers.Request = RedirectedRequest
 
1356
 
 
1357
    def cleanup_redirected_request(self):
 
1358
        _urllib2_wrappers.Request = self.original_class
 
1359
 
1402
1360
    def create_transport_secondary_server(self):
1403
1361
        """Create the secondary server, redirections are defined in the tests"""
1404
1362
        return http_utils.HTTPServerRedirecting(
1408
1366
        t = self.old_transport
1409
1367
 
1410
1368
        req = RedirectedRequest('GET', t.abspath('a'))
 
1369
        req.follow_redirections = True
1411
1370
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1412
1371
                                       self.new_server.port)
1413
1372
        self.old_server.redirections = \
1414
1373
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
1415
 
        self.assertEqual('redirected once',t._perform(req).read())
 
1374
        self.assertEquals('redirected once',t._perform(req).read())
1416
1375
 
1417
1376
    def test_five_redirections(self):
1418
1377
        t = self.old_transport
1419
1378
 
1420
1379
        req = RedirectedRequest('GET', t.abspath('a'))
 
1380
        req.follow_redirections = True
1421
1381
        old_prefix = 'http://%s:%s' % (self.old_server.host,
1422
1382
                                       self.old_server.port)
1423
1383
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1429
1389
            ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
1430
1390
            ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
1431
1391
            ]
1432
 
        self.assertEqual('redirected 5 times',t._perform(req).read())
 
1392
        self.assertEquals('redirected 5 times',t._perform(req).read())
1433
1393
 
1434
1394
 
1435
1395
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
1448
1408
        t = self._transport(self.new_server.get_url())
1449
1409
 
1450
1410
        # We use None for redirected so that we fail if redirected
1451
 
        self.assertEqual('0123456789',
1452
 
                         transport.do_catching_redirections(
 
1411
        self.assertEquals('0123456789',
 
1412
                          transport.do_catching_redirections(
1453
1413
                self.get_a, t, None).read())
1454
1414
 
1455
1415
    def test_one_redirection(self):
1460
1420
            dir, file = urlutils.split(exception.target)
1461
1421
            return self._transport(dir)
1462
1422
 
1463
 
        self.assertEqual('0123456789',
1464
 
                         transport.do_catching_redirections(
 
1423
        self.assertEquals('0123456789',
 
1424
                          transport.do_catching_redirections(
1465
1425
                self.get_a, self.old_transport, redirected).read())
1466
 
        self.assertEqual(1, self.redirections)
 
1426
        self.assertEquals(1, self.redirections)
1467
1427
 
1468
1428
    def test_redirection_loop(self):
1469
1429
 
1483
1443
 
1484
1444
    _auth_header = 'Authorization'
1485
1445
    _password_prompt_prefix = ''
1486
 
    _username_prompt_prefix = ''
1487
 
    # Set by load_tests
1488
 
    _auth_server = None
1489
1446
 
1490
1447
    def setUp(self):
1491
1448
        super(TestAuth, self).setUp()
1494
1451
                                  ('b', 'contents of b\n'),])
1495
1452
 
1496
1453
    def create_transport_readonly_server(self):
1497
 
        return self._auth_server(protocol_version=self._protocol_version)
 
1454
        if self._auth_scheme == 'basic':
 
1455
            server = http_utils.HTTPBasicAuthServer(
 
1456
                protocol_version=self._protocol_version)
 
1457
        else:
 
1458
            if self._auth_scheme != 'digest':
 
1459
                raise AssertionError('Unknown auth scheme: %r'
 
1460
                                     % self._auth_scheme)
 
1461
            server = http_utils.HTTPDigestAuthServer(
 
1462
                protocol_version=self._protocol_version)
 
1463
        return server
1498
1464
 
1499
1465
    def _testing_pycurl(self):
1500
 
        # TODO: This is duplicated for lots of the classes in this file
1501
 
        return (features.pycurl.available()
1502
 
                and self._transport == PyCurlTransport)
 
1466
        return pycurl_present and self._transport == PyCurlTransport
1503
1467
 
1504
1468
    def get_user_url(self, user, password):
1505
1469
        """Build an url embedding user and password"""
1553
1517
        # initial 'who are you' and 'this is not you, who are you')
1554
1518
        self.assertEqual(2, self.server.auth_required_errors)
1555
1519
 
1556
 
    def test_prompt_for_username(self):
1557
 
        if self._testing_pycurl():
1558
 
            raise tests.TestNotApplicable(
1559
 
                'pycurl cannot prompt, it handles auth by embedding'
1560
 
                ' user:pass in urls only')
1561
 
 
1562
 
        self.server.add_user('joe', 'foo')
1563
 
        t = self.get_user_transport(None, None)
1564
 
        stdout = tests.StringIOWrapper()
1565
 
        stderr = tests.StringIOWrapper()
1566
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
1567
 
                                            stdout=stdout, stderr=stderr)
1568
 
        self.assertEqual('contents of a\n',t.get('a').read())
1569
 
        # stdin should be empty
1570
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
1571
 
        stderr.seek(0)
1572
 
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
1573
 
        self.assertEqual(expected_prompt, stderr.read(len(expected_prompt)))
1574
 
        self.assertEqual('', stdout.getvalue())
1575
 
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1576
 
                                    stderr.readline())
1577
 
 
1578
1520
    def test_prompt_for_password(self):
1579
1521
        if self._testing_pycurl():
1580
1522
            raise tests.TestNotApplicable(
1584
1526
        self.server.add_user('joe', 'foo')
1585
1527
        t = self.get_user_transport('joe', None)
1586
1528
        stdout = tests.StringIOWrapper()
1587
 
        stderr = tests.StringIOWrapper()
1588
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
1589
 
                                            stdout=stdout, stderr=stderr)
1590
 
        self.assertEqual('contents of a\n', t.get('a').read())
 
1529
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1530
        self.assertEqual('contents of a\n',t.get('a').read())
1591
1531
        # stdin should be empty
1592
1532
        self.assertEqual('', ui.ui_factory.stdin.readline())
1593
1533
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1594
 
                                    stderr.getvalue())
1595
 
        self.assertEqual('', stdout.getvalue())
 
1534
                                    stdout.getvalue())
1596
1535
        # And we shouldn't prompt again for a different request
1597
1536
        # against the same transport.
1598
1537
        self.assertEqual('contents of b\n',t.get('b').read())
1608
1547
                              % (scheme.upper(),
1609
1548
                                 user, self.server.host, self.server.port,
1610
1549
                                 self.server.auth_realm)))
1611
 
        self.assertEqual(expected_prompt, actual_prompt)
1612
 
 
1613
 
    def _expected_username_prompt(self, scheme):
1614
 
        return (self._username_prompt_prefix
1615
 
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1616
 
                                 self.server.host, self.server.port,
1617
 
                                 self.server.auth_realm))
 
1550
        self.assertEquals(expected_prompt, actual_prompt)
1618
1551
 
1619
1552
    def test_no_prompt_for_password_when_using_auth_config(self):
1620
1553
        if self._testing_pycurl():
1628
1561
        self.server.add_user(user, password)
1629
1562
        t = self.get_user_transport(user, None)
1630
1563
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1631
 
                                            stderr=tests.StringIOWrapper())
 
1564
                                            stdout=tests.StringIOWrapper())
1632
1565
        # Create a minimal config file with the right password
1633
1566
        conf = config.AuthenticationConfig()
1634
1567
        conf._get_config().update(
1662
1595
        self.assertEqual(1, self.server.auth_required_errors)
1663
1596
 
1664
1597
    def test_changing_nonce(self):
1665
 
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1666
 
                                     http_utils.ProxyDigestAuthServer):
1667
 
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
 
1598
        if self._auth_scheme != 'digest':
 
1599
            raise tests.TestNotApplicable('HTTP auth digest only test')
1668
1600
        if self._testing_pycurl():
1669
1601
            raise tests.KnownFailure(
1670
1602
                'pycurl does not handle a nonce change')
1688
1620
    """Test proxy authentication schemes."""
1689
1621
 
1690
1622
    _auth_header = 'Proxy-authorization'
1691
 
    _password_prompt_prefix = 'Proxy '
1692
 
    _username_prompt_prefix = 'Proxy '
 
1623
    _password_prompt_prefix='Proxy '
1693
1624
 
1694
1625
    def setUp(self):
1695
1626
        super(TestProxyAuth, self).setUp()
1702
1633
                                  ('b-proxied', 'contents of b\n'),
1703
1634
                                  ])
1704
1635
 
 
1636
    def create_transport_readonly_server(self):
 
1637
        if self._auth_scheme == 'basic':
 
1638
            server = http_utils.ProxyBasicAuthServer(
 
1639
                protocol_version=self._protocol_version)
 
1640
        else:
 
1641
            if self._auth_scheme != 'digest':
 
1642
                raise AssertionError('Unknown auth scheme: %r'
 
1643
                                     % self._auth_scheme)
 
1644
            server = http_utils.ProxyDigestAuthServer(
 
1645
                protocol_version=self._protocol_version)
 
1646
        return server
 
1647
 
1705
1648
    def get_user_transport(self, user, password):
1706
1649
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1707
1650
        return self._transport(self.server.get_url())
1846
1789
                             'http://www.example.com/foo/subdir')
1847
1790
        self.assertIsInstance(r, type(t))
1848
1791
        # Both transports share the some connection
1849
 
        self.assertEqual(t._get_connection(), r._get_connection())
 
1792
        self.assertEquals(t._get_connection(), r._get_connection())
1850
1793
 
1851
1794
    def test_redirected_to_self_with_slash(self):
1852
1795
        t = self._transport('http://www.example.com/foo')
1856
1799
        # Both transports share the some connection (one can argue that we
1857
1800
        # should return the exact same transport here, but that seems
1858
1801
        # overkill).
1859
 
        self.assertEqual(t._get_connection(), r._get_connection())
 
1802
        self.assertEquals(t._get_connection(), r._get_connection())
1860
1803
 
1861
1804
    def test_redirected_to_host(self):
1862
1805
        t = self._transport('http://www.example.com/foo')
1881
1824
        r = t._redirected_to('http://www.example.com/foo',
1882
1825
                             'https://foo.example.com/foo')
1883
1826
        self.assertIsInstance(r, type(t))
1884
 
        self.assertEqual(t._user, r._user)
 
1827
        self.assertEquals(t._user, r._user)
1885
1828
 
1886
1829
 
1887
1830
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
1946
1889
        pass
1947
1890
 
1948
1891
 
1949
 
class TestActivityMixin(object):
 
1892
class TestActivity(tests.TestCase):
1950
1893
    """Test socket activity reporting.
1951
1894
 
1952
1895
    We use a special purpose server to control the bytes sent and received and
1956
1899
    def setUp(self):
1957
1900
        tests.TestCase.setUp(self)
1958
1901
        self.server = self._activity_server(self._protocol_version)
1959
 
        self.server.start_server()
 
1902
        self.server.setUp()
1960
1903
        self.activities = {}
1961
1904
        def report_activity(t, bytes, direction):
1962
1905
            count = self.activities.get(direction, 0)
1965
1908
 
1966
1909
        # We override at class level because constructors may propagate the
1967
1910
        # bound method and render instance overriding ineffective (an
1968
 
        # alternative would be to define a specific ui factory instead...)
 
1911
        # alternative would be be to define a specific ui factory instead...)
1969
1912
        self.orig_report_activity = self._transport._report_activity
1970
1913
        self._transport._report_activity = report_activity
1971
1914
 
1972
1915
    def tearDown(self):
1973
1916
        self._transport._report_activity = self.orig_report_activity
1974
 
        self.server.stop_server()
 
1917
        self.server.tearDown()
1975
1918
        tests.TestCase.tearDown(self)
1976
1919
 
1977
1920
    def get_transport(self):
2090
2033
        code, f = t._post('abc def end-of-body\n')
2091
2034
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2092
2035
        self.assertActivitiesMatch()
2093
 
 
2094
 
 
2095
 
class TestActivity(tests.TestCase, TestActivityMixin):
2096
 
 
2097
 
    def setUp(self):
2098
 
        tests.TestCase.setUp(self)
2099
 
        self.server = self._activity_server(self._protocol_version)
2100
 
        self.server.start_server()
2101
 
        self.activities = {}
2102
 
        def report_activity(t, bytes, direction):
2103
 
            count = self.activities.get(direction, 0)
2104
 
            count += bytes
2105
 
            self.activities[direction] = count
2106
 
 
2107
 
        # We override at class level because constructors may propagate the
2108
 
        # bound method and render instance overriding ineffective (an
2109
 
        # alternative would be to define a specific ui factory instead...)
2110
 
        self.orig_report_activity = self._transport._report_activity
2111
 
        self._transport._report_activity = report_activity
2112
 
 
2113
 
    def tearDown(self):
2114
 
        self._transport._report_activity = self.orig_report_activity
2115
 
        self.server.stop_server()
2116
 
        tests.TestCase.tearDown(self)
2117
 
 
2118
 
 
2119
 
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2120
 
 
2121
 
    def setUp(self):
2122
 
        tests.TestCase.setUp(self)
2123
 
        # Unlike TestActivity, we are really testing ReportingFileSocket and
2124
 
        # ReportingSocket, so we don't need all the parametrization. Since
2125
 
        # ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2126
 
        # test them through their use by the transport than directly (that's a
2127
 
        # bit less clean but far more simpler and effective).
2128
 
        self.server = ActivityHTTPServer('HTTP/1.1')
2129
 
        self._transport=_urllib.HttpTransport_urllib
2130
 
 
2131
 
        self.server.start_server()
2132
 
 
2133
 
        # We override at class level because constructors may propagate the
2134
 
        # bound method and render instance overriding ineffective (an
2135
 
        # alternative would be to define a specific ui factory instead...)
2136
 
        self.orig_report_activity = self._transport._report_activity
2137
 
        self._transport._report_activity = None
2138
 
 
2139
 
    def tearDown(self):
2140
 
        self._transport._report_activity = self.orig_report_activity
2141
 
        self.server.stop_server()
2142
 
        tests.TestCase.tearDown(self)
2143
 
 
2144
 
    def assertActivitiesMatch(self):
2145
 
        # Nothing to check here
2146
 
        pass
2147
 
 
2148
 
 
2149
 
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2150
 
    """Test authentication on the redirected http server."""
2151
 
 
2152
 
    _auth_header = 'Authorization'
2153
 
    _password_prompt_prefix = ''
2154
 
    _username_prompt_prefix = ''
2155
 
    _auth_server = http_utils.HTTPBasicAuthServer
2156
 
    _transport = _urllib.HttpTransport_urllib
2157
 
 
2158
 
    def create_transport_readonly_server(self):
2159
 
        return self._auth_server()
2160
 
 
2161
 
    def create_transport_secondary_server(self):
2162
 
        """Create the secondary server redirecting to the primary server"""
2163
 
        new = self.get_readonly_server()
2164
 
 
2165
 
        redirecting = http_utils.HTTPServerRedirecting()
2166
 
        redirecting.redirect_to(new.host, new.port)
2167
 
        return redirecting
2168
 
 
2169
 
    def setUp(self):
2170
 
        super(TestAuthOnRedirected, self).setUp()
2171
 
        self.build_tree_contents([('a','a'),
2172
 
                                  ('1/',),
2173
 
                                  ('1/a', 'redirected once'),
2174
 
                                  ],)
2175
 
        new_prefix = 'http://%s:%s' % (self.new_server.host,
2176
 
                                       self.new_server.port)
2177
 
        self.old_server.redirections = [
2178
 
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
2179
 
        self.old_transport = self._transport(self.old_server.get_url())
2180
 
        self.new_server.add_user('joe', 'foo')
2181
 
 
2182
 
    def get_a(self, transport):
2183
 
        return transport.get('a')
2184
 
 
2185
 
    def test_auth_on_redirected_via_do_catching_redirections(self):
2186
 
        self.redirections = 0
2187
 
 
2188
 
        def redirected(transport, exception, redirection_notice):
2189
 
            self.redirections += 1
2190
 
            dir, file = urlutils.split(exception.target)
2191
 
            return self._transport(dir)
2192
 
 
2193
 
        stdout = tests.StringIOWrapper()
2194
 
        stderr = tests.StringIOWrapper()
2195
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2196
 
                                            stdout=stdout, stderr=stderr)
2197
 
        self.assertEqual('redirected once',
2198
 
                         transport.do_catching_redirections(
2199
 
                self.get_a, self.old_transport, redirected).read())
2200
 
        self.assertEqual(1, self.redirections)
2201
 
        # stdin should be empty
2202
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
2203
 
        # stdout should be empty, stderr will contains the prompts
2204
 
        self.assertEqual('', stdout.getvalue())
2205
 
 
2206
 
    def test_auth_on_redirected_via_following_redirections(self):
2207
 
        self.new_server.add_user('joe', 'foo')
2208
 
        stdout = tests.StringIOWrapper()
2209
 
        stderr = tests.StringIOWrapper()
2210
 
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2211
 
                                            stdout=stdout, stderr=stderr)
2212
 
        t = self.old_transport
2213
 
        req = RedirectedRequest('GET', t.abspath('a'))
2214
 
        new_prefix = 'http://%s:%s' % (self.new_server.host,
2215
 
                                       self.new_server.port)
2216
 
        self.old_server.redirections = [
2217
 
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
2218
 
        self.assertEqual('redirected once',t._perform(req).read())
2219
 
        # stdin should be empty
2220
 
        self.assertEqual('', ui.ui_factory.stdin.readline())
2221
 
        # stdout should be empty, stderr will contains the prompts
2222
 
        self.assertEqual('', stdout.getvalue())
2223
 
 
2224