20
20
transport implementation, http protocol versions and authentication schemes.
23
# TODO: Should be renamed to bzrlib.transport.http.tests?
24
# TODO: What about renaming to bzrlib.tests.transport.http ?
23
# TODO: Should be renamed to breezy.transport.http.tests?
24
# TODO: What about renaming to breezy.tests.transport.http ?
26
from cStringIO import StringIO
30
28
import SimpleHTTPServer
41
40
remote as _mod_remote,
47
from bzrlib.symbol_versioning import (
50
from bzrlib.tests import (
55
from bzrlib.transport import (
52
from .scenarios import (
53
load_tests_apply_scenarios,
56
from ..transport import (
59
from bzrlib.transport.http import (
60
from ..transport.http import (
65
if features.pycurl.available():
66
from bzrlib.transport.http._pycurl import PyCurlTransport
69
def load_tests(standard_tests, module, loader):
70
"""Multiply tests for http clients and protocol versions."""
71
result = loader.suiteClass()
73
# one for each transport implementation
74
t_tests, remaining_tests = tests.split_suite_by_condition(
75
standard_tests, tests.condition_isinstance((
76
TestHttpTransportRegistration,
77
TestHttpTransportUrls,
66
load_tests = load_tests_apply_scenarios
69
def vary_by_http_client_implementation():
70
"""Test the libraries we can use, currently just urllib."""
80
71
transport_scenarios = [
81
72
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
82
73
_server=http_server.HttpServer_urllib,
83
_qualified_prefix='http+urllib',)),
85
if features.pycurl.available():
86
transport_scenarios.append(
87
('pycurl', dict(_transport=PyCurlTransport,
88
_server=http_server.HttpServer_PyCurl,
89
_qualified_prefix='http+pycurl',)))
90
tests.multiply_tests(t_tests, transport_scenarios, result)
92
# each implementation tested with each HTTP version
93
tp_tests, remaining_tests = tests.split_suite_by_condition(
94
remaining_tests, tests.condition_isinstance((
95
SmartHTTPTunnellingTest,
96
TestDoCatchRedirections,
99
TestHTTPSilentRedirections,
100
TestLimitedRangeRequestServer,
104
TestSpecificRequestHandler,
106
protocol_scenarios = [
107
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
108
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
110
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
112
tests.multiply_tests(tp_tests, tp_scenarios, result)
114
# proxy auth: each auth scheme on all http versions on all implementations.
115
tppa_tests, remaining_tests = tests.split_suite_by_condition(
116
remaining_tests, tests.condition_isinstance((
119
proxy_auth_scheme_scenarios = [
120
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
121
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
123
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
125
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
proxy_auth_scheme_scenarios)
127
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
129
# auth: each auth scheme on all http versions on all implementations.
130
tpa_tests, remaining_tests = tests.split_suite_by_condition(
131
remaining_tests, tests.condition_isinstance((
134
auth_scheme_scenarios = [
74
_url_protocol='http+urllib',)),
76
return transport_scenarios
79
def vary_by_http_protocol_version():
80
"""Test on http/1.0 and 1.1"""
82
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
83
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
87
def vary_by_http_auth_scheme():
135
89
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
136
90
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
138
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
140
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
141
auth_scheme_scenarios)
142
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
144
# activity: on all http[s] versions on all implementations
145
tpact_tests, remaining_tests = tests.split_suite_by_condition(
146
remaining_tests, tests.condition_isinstance((
92
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
94
# Add some attributes common to all scenarios
95
for scenario_id, scenario_dict in scenarios:
96
scenario_dict.update(_auth_header='Authorization',
97
_username_prompt_prefix='',
98
_password_prompt_prefix='')
102
def vary_by_http_proxy_auth_scheme():
104
('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
105
('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
106
('proxy-basicdigest',
107
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
109
# Add some attributes common to all scenarios
110
for scenario_id, scenario_dict in scenarios:
111
scenario_dict.update(_auth_header='Proxy-Authorization',
112
_username_prompt_prefix='Proxy ',
113
_password_prompt_prefix='Proxy ')
117
def vary_by_http_activity():
149
118
activity_scenarios = [
150
119
('urllib,http', dict(_activity_server=ActivityHTTPServer,
151
_transport=_urllib.HttpTransport_urllib,)),
120
_transport=_urllib.HttpTransport_urllib,)),
153
if tests.HTTPSServerFeature.available():
122
if features.HTTPSServerFeature.available():
123
# FIXME: Until we have a better way to handle self-signed certificates
124
# (like allowing them in a test specific authentication.conf for
125
# example), we need some specialized urllib transport for tests.
130
class HTTPS_urllib_transport(_urllib.HttpTransport_urllib):
132
def __init__(self, base, _from_transport=None):
133
super(HTTPS_urllib_transport, self).__init__(
134
base, _from_transport=_from_transport,
135
ca_certs=ssl_certs.build_path('ca.crt'))
154
137
activity_scenarios.append(
155
138
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
156
_transport=_urllib.HttpTransport_urllib,)),)
157
if features.pycurl.available():
158
activity_scenarios.append(
159
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
160
_transport=PyCurlTransport,)),)
161
if tests.HTTPSServerFeature.available():
162
from bzrlib.tests import (
165
# FIXME: Until we have a better way to handle self-signed
166
# certificates (like allowing them in a test specific
167
# authentication.conf for example), we need some specialized pycurl
168
# transport for tests.
169
class HTTPS_pycurl_transport(PyCurlTransport):
171
def __init__(self, base, _from_transport=None):
172
super(HTTPS_pycurl_transport, self).__init__(
173
base, _from_transport)
174
self.cabundle = str(ssl_certs.build_path('ca.crt'))
176
activity_scenarios.append(
177
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
178
_transport=HTTPS_pycurl_transport,)),)
180
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
182
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
184
# No parametrization for the remaining tests
185
result.addTests(remaining_tests)
139
_transport=HTTPS_urllib_transport,)),)
140
return activity_scenarios
190
143
class FakeManager(object):
296
262
self.assertEqual('realm="Thou should not pass"', remainder)
265
class TestHTTPRangeParsing(tests.TestCase):
268
super(TestHTTPRangeParsing, self).setUp()
269
# We focus on range parsing here and ignore everything else
270
class RequestHandler(http_server.TestingHTTPRequestHandler):
271
def setup(self): pass
272
def handle(self): pass
273
def finish(self): pass
275
self.req_handler = RequestHandler(None, None, None)
277
def assertRanges(self, ranges, header, file_size):
278
self.assertEqual(ranges,
279
self.req_handler._parse_ranges(header, file_size))
281
def test_simple_range(self):
282
self.assertRanges([(0,2)], 'bytes=0-2', 12)
285
self.assertRanges([(8, 11)], 'bytes=-4', 12)
287
def test_tail_bigger_than_file(self):
288
self.assertRanges([(0, 11)], 'bytes=-99', 12)
290
def test_range_without_end(self):
291
self.assertRanges([(4, 11)], 'bytes=4-', 12)
293
def test_invalid_ranges(self):
294
self.assertRanges(None, 'bytes=12-22', 12)
295
self.assertRanges(None, 'bytes=1-3,12-22', 12)
296
self.assertRanges(None, 'bytes=-', 12)
299
299
class TestHTTPServer(tests.TestCase):
300
300
"""Test the HTTP servers implementations."""
366
357
server = http_server.HttpServer(RequestHandlerOneOne,
367
358
protocol_version='HTTP/1.0')
368
359
self.start_server(server)
369
self.assertIsInstance(server._httpd,
360
self.assertIsInstance(server.server,
370
361
http_server.TestingHTTPServer)
373
class TestWithTransport_pycurl(object):
374
"""Test case to inherit from if pycurl is present"""
376
def _get_pycurl_maybe(self):
377
self.requireFeature(features.pycurl)
378
return PyCurlTransport
380
_transport = property(_get_pycurl_maybe)
383
class TestHttpUrls(tests.TestCase):
385
# TODO: This should be moved to authorization tests once they
388
def test_url_parsing(self):
390
url = http.extract_auth('http://example.com', f)
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
393
url = http.extract_auth(
394
'http://user:pass@example.com/bzr/bzr.dev', f)
395
self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'example.com', 'user', 'pass'],
401
364
class TestHttpTransportUrls(tests.TestCase):
402
365
"""Test the http urls."""
367
scenarios = vary_by_http_client_implementation()
404
369
def test_abs_url(self):
405
370
"""Construction of absolute http URLs"""
406
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
371
t = self._transport('http://example.com/bzr/bzr.dev/')
407
372
eq = self.assertEqualDiff
408
eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
409
eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
410
eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
373
eq(t.abspath('.'), 'http://example.com/bzr/bzr.dev')
374
eq(t.abspath('foo/bar'), 'http://example.com/bzr/bzr.dev/foo/bar')
375
eq(t.abspath('.bzr'), 'http://example.com/bzr/bzr.dev/.bzr')
411
376
eq(t.abspath('.bzr/1//2/./3'),
412
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
377
'http://example.com/bzr/bzr.dev/.bzr/1/2/3')
414
379
def test_invalid_http_urls(self):
415
380
"""Trap invalid construction of urls"""
416
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
381
self._transport('http://example.com/bzr/bzr.dev/')
417
382
self.assertRaises(errors.InvalidURL,
419
'http://http://bazaar-vcs.org/bzr/bzr.dev/')
384
'http://http://example.com/bzr/bzr.dev/')
421
386
def test_http_root_urls(self):
422
387
"""Construction of URLs from server root"""
423
t = self._transport('http://bzr.ozlabs.org/')
388
t = self._transport('http://example.com/')
424
389
eq = self.assertEqualDiff
425
390
eq(t.abspath('.bzr/tree-version'),
426
'http://bzr.ozlabs.org/.bzr/tree-version')
391
'http://example.com/.bzr/tree-version')
428
393
def test_http_impl_urls(self):
429
394
"""There are servers which ask for particular clients to connect"""
431
396
server.start_server()
433
398
url = server.get_url()
434
self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
399
self.assertTrue(url.startswith('%s://' % self._url_protocol))
436
401
server.stop_server()
439
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
441
# TODO: This should really be moved into another pycurl
442
# specific test. When https tests will be implemented, take
443
# this one into account.
444
def test_pycurl_without_https_support(self):
445
"""Test that pycurl without SSL do not fail with a traceback.
447
For the purpose of the test, we force pycurl to ignore
448
https by supplying a fake version_info that do not
451
self.requireFeature(features.pycurl)
452
# Import the module locally now that we now it's available.
453
pycurl = features.pycurl.module
455
self.overrideAttr(pycurl, 'version_info',
456
# Fake the pycurl version_info This was taken from
457
# a windows pycurl without SSL (thanks to bialix)
466
('ftp', 'gopher', 'telnet',
467
'dict', 'ldap', 'http', 'file'),
471
self.assertRaises(errors.DependencyNotPresent, self._transport,
472
'https://launchpad.net')
475
404
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
476
405
"""Test the http connections."""
407
scenarios = multiply_scenarios(
408
vary_by_http_client_implementation(),
409
vary_by_http_protocol_version(),
479
http_utils.TestCaseWithWebserver.setUp(self)
413
super(TestHTTPConnections, self).setUp()
480
414
self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
481
415
transport=self.get_transport())
483
417
def test_http_has(self):
484
418
server = self.get_readonly_server()
485
t = self._transport(server.get_url())
419
t = self.get_readonly_transport()
486
420
self.assertEqual(t.has('foo/bar'), True)
487
421
self.assertEqual(len(server.logs), 1)
488
422
self.assertContainsRe(server.logs[0],
525
459
class TestHttpTransportRegistration(tests.TestCase):
526
460
"""Test registrations of various http implementations"""
462
scenarios = vary_by_http_client_implementation()
528
464
def test_http_registered(self):
529
t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
465
t = transport.get_transport_from_url(
466
'%s://foo.com/' % self._url_protocol)
530
467
self.assertIsInstance(t, transport.Transport)
531
468
self.assertIsInstance(t, self._transport)
534
471
class TestPost(tests.TestCase):
473
scenarios = multiply_scenarios(
474
vary_by_http_client_implementation(),
475
vary_by_http_protocol_version(),
536
478
def test_post_body_is_received(self):
537
479
server = RecordingServer(expect_body_tail='end-of-body',
538
scheme=self._qualified_prefix)
480
scheme=self._url_protocol)
539
481
self.start_server(server)
540
482
url = server.get_url()
541
http_transport = self._transport(url)
483
# FIXME: needs a cleanup -- vila 20100611
484
http_transport = transport.get_transport_from_url(url)
542
485
code, response = http_transport._post('abc def end-of-body')
544
487
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
545
488
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
489
self.assertTrue('content-type: application/octet-stream\r'
490
in server.received_bytes.lower())
546
491
# The transport should not be assuming that the server can accept
547
492
# chunked encoding the first time it connects, because HTTP/1.1, so we
548
493
# check for the literal string.
1022
960
def test_readv_with_short_reads(self):
1023
961
server = self.get_readonly_server()
1024
t = self._transport(server.get_url())
962
t = self.get_readonly_transport()
1025
963
# Force separate ranges for each offset
1026
964
t._bytes_to_read_before_seek = 0
1027
965
ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1028
self.assertEqual((0, '0'), ireadv.next())
1029
self.assertEqual((2, '2'), ireadv.next())
1030
if not self._testing_pycurl():
1031
# Only one request have been issued so far (except for pycurl that
1032
# try to read the whole response at once)
1033
self.assertEqual(1, server.GET_request_nb)
1034
self.assertEqual((4, '45'), ireadv.next())
1035
self.assertEqual((9, '9'), ireadv.next())
1036
# Both implementations issue 3 requests but:
1037
# - urllib does two multiple (4 ranges, then 2 ranges) then a single
1039
# - pycurl does two multiple (4 ranges, 4 ranges) then a single range
966
self.assertEqual((0, '0'), next(ireadv))
967
self.assertEqual((2, '2'), next(ireadv))
968
# Only one request have been issued so far
969
self.assertEqual(1, server.GET_request_nb)
970
self.assertEqual((4, '45'), next(ireadv))
971
self.assertEqual((9, '9'), next(ireadv))
972
# We issue 3 requests: two multiple (4 ranges, then 2 ranges) then a
1040
974
self.assertEqual(3, server.GET_request_nb)
1041
975
# Finally the client have tried a single range request and stays in
1043
977
self.assertEqual('single', t._range_hint)
980
class TruncatedBeforeBoundaryRequestHandler(
981
http_server.TestingHTTPRequestHandler):
982
"""Truncation before a boundary, like in bug 198646"""
984
_truncated_ranges = 1
986
def get_multiple_ranges(self, file, file_size, ranges):
987
self.send_response(206)
988
self.send_header('Accept-Ranges', 'bytes')
990
self.send_header('Content-Type',
991
'multipart/byteranges; boundary=%s' % boundary)
992
boundary_line = '--%s\r\n' % boundary
993
# Calculate the Content-Length
995
for (start, end) in ranges:
996
content_length += len(boundary_line)
997
content_length += self._header_line_length(
998
'Content-type', 'application/octet-stream')
999
content_length += self._header_line_length(
1000
'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
1001
content_length += len('\r\n') # end headers
1002
content_length += end - start # + 1
1003
content_length += len(boundary_line)
1004
self.send_header('Content-length', content_length)
1007
# Send the multipart body
1009
for (start, end) in ranges:
1010
if cur + self._truncated_ranges >= len(ranges):
1011
# Abruptly ends the response and close the connection
1012
self.close_connection = 1
1014
self.wfile.write(boundary_line)
1015
self.send_header('Content-type', 'application/octet-stream')
1016
self.send_header('Content-Range', 'bytes %d-%d/%d'
1017
% (start, end, file_size))
1019
self.send_range_content(file, start, end - start + 1)
1022
self.wfile.write(boundary_line)
1025
class TestTruncatedBeforeBoundary(TestSpecificRequestHandler):
1026
"""Tests the case of bug 198646, disconnecting before a boundary."""
1028
_req_handler_class = TruncatedBeforeBoundaryRequestHandler
1031
super(TestTruncatedBeforeBoundary, self).setUp()
1032
self.build_tree_contents([('a', '0123456789')],)
1034
def test_readv_with_short_reads(self):
1035
server = self.get_readonly_server()
1036
t = self.get_readonly_transport()
1037
# Force separate ranges for each offset
1038
t._bytes_to_read_before_seek = 0
1039
ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1040
self.assertEqual((0, '0'), next(ireadv))
1041
self.assertEqual((2, '2'), next(ireadv))
1042
self.assertEqual((4, '45'), next(ireadv))
1043
self.assertEqual((9, '9'), next(ireadv))
1045
1046
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1046
1047
"""Errors out when range specifiers exceed the limit"""
1114
1117
Only the urllib implementation is tested here.
1118
tests.TestCase.setUp(self)
1123
tests.TestCase.tearDown(self)
1125
def _install_env(self, env):
1126
for name, value in env.iteritems():
1127
self._old_env[name] = osutils.set_or_unset_env(name, value)
1129
def _restore_env(self):
1130
for name, value in self._old_env.iteritems():
1131
osutils.set_or_unset_env(name, value)
1133
1120
def _proxied_request(self):
1134
1121
handler = _urllib2_wrappers.ProxyHandler()
1135
request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
1122
request = _urllib2_wrappers.Request('GET', 'http://baz/buzzle')
1136
1123
handler.set_proxy(request, 'http')
1126
def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1127
handler = _urllib2_wrappers.ProxyHandler()
1128
self.assertEqual(expected,
1129
handler.evaluate_proxy_bypass(host, no_proxy))
1139
1131
def test_empty_user(self):
1140
self._install_env({'http_proxy': 'http://bar.com'})
1141
request = self._proxied_request()
1142
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1132
self.overrideEnv('http_proxy', 'http://bar.com')
1133
request = self._proxied_request()
1134
self.assertFalse('Proxy-authorization' in request.headers)
1136
def test_user_with_at(self):
1137
self.overrideEnv('http_proxy',
1138
'http://username@domain:password@proxy_host:1234')
1139
request = self._proxied_request()
1140
self.assertFalse('Proxy-authorization' in request.headers)
1144
1142
def test_invalid_proxy(self):
1145
1143
"""A proxy env variable without scheme"""
1146
self._install_env({'http_proxy': 'host:1234'})
1144
self.overrideEnv('http_proxy', 'host:1234')
1147
1145
self.assertRaises(errors.InvalidURL, self._proxied_request)
1147
def test_evaluate_proxy_bypass_true(self):
1148
"""The host is not proxied"""
1149
self.assertEvaluateProxyBypass(True, 'example.com', 'example.com')
1150
self.assertEvaluateProxyBypass(True, 'bzr.example.com', '*example.com')
1152
def test_evaluate_proxy_bypass_false(self):
1153
"""The host is proxied"""
1154
self.assertEvaluateProxyBypass(False, 'bzr.example.com', None)
1156
def test_evaluate_proxy_bypass_unknown(self):
1157
"""The host is not explicitly proxied"""
1158
self.assertEvaluateProxyBypass(None, 'example.com', 'not.example.com')
1159
self.assertEvaluateProxyBypass(None, 'bzr.example.com', 'example.com')
1161
def test_evaluate_proxy_bypass_empty_entries(self):
1162
"""Ignore empty entries"""
1163
self.assertEvaluateProxyBypass(None, 'example.com', '')
1164
self.assertEvaluateProxyBypass(None, 'example.com', ',')
1165
self.assertEvaluateProxyBypass(None, 'example.com', 'foo,,bar')
1150
1168
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
1151
1169
"""Tests proxy server.
1156
1174
to the file names).
1177
scenarios = multiply_scenarios(
1178
vary_by_http_client_implementation(),
1179
vary_by_http_protocol_version(),
1159
1182
# FIXME: We don't have an https server available, so we don't
1160
# test https connections.
1183
# test https connections. --vila toolongago
1162
1185
def setUp(self):
1163
1186
super(TestProxyHttpServer, self).setUp()
1187
self.transport_secondary_server = http_utils.ProxyServer
1164
1188
self.build_tree_contents([('foo', 'contents of foo\n'),
1165
1189
('foo-proxied', 'proxied contents of foo\n')])
1166
1190
# Let's setup some attributes for tests
1167
self.server = self.get_readonly_server()
1168
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1169
if self._testing_pycurl():
1170
# Oh my ! pycurl does not check for the port as part of
1171
# no_proxy :-( So we just test the host part
1172
self.no_proxy_host = self.server.host
1174
self.no_proxy_host = self.proxy_address
1191
server = self.get_readonly_server()
1192
self.server_host_port = '%s:%d' % (server.host, server.port)
1193
self.no_proxy_host = self.server_host_port
1175
1194
# The secondary server is the proxy
1176
self.proxy = self.get_secondary_server()
1177
self.proxy_url = self.proxy.get_url()
1180
def _testing_pycurl(self):
1181
# TODO: This is duplicated for lots of the classes in this file
1182
return (features.pycurl.available()
1183
and self._transport == PyCurlTransport)
1185
def create_transport_secondary_server(self):
1186
"""Creates an http server that will serve files with
1187
'-proxied' appended to their names.
1189
return http_utils.ProxyServer(protocol_version=self._protocol_version)
1191
def _install_env(self, env):
1192
for name, value in env.iteritems():
1193
self._old_env[name] = osutils.set_or_unset_env(name, value)
1195
def _restore_env(self):
1196
for name, value in self._old_env.iteritems():
1197
osutils.set_or_unset_env(name, value)
1199
def proxied_in_env(self, env):
1200
self._install_env(env)
1201
url = self.server.get_url()
1202
t = self._transport(url)
1204
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1208
def not_proxied_in_env(self, env):
1209
self._install_env(env)
1210
url = self.server.get_url()
1211
t = self._transport(url)
1213
self.assertEqual('contents of foo\n', t.get('foo').read())
1195
self.proxy_url = self.get_secondary_url()
1197
def assertProxied(self):
1198
t = self.get_readonly_transport()
1199
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1201
def assertNotProxied(self):
1202
t = self.get_readonly_transport()
1203
self.assertEqual('contents of foo\n', t.get('foo').read())
1217
1205
def test_http_proxy(self):
1218
self.proxied_in_env({'http_proxy': self.proxy_url})
1206
self.overrideEnv('http_proxy', self.proxy_url)
1207
self.assertProxied()
1220
1209
def test_HTTP_PROXY(self):
1221
if self._testing_pycurl():
1222
# pycurl does not check HTTP_PROXY for security reasons
1223
# (for use in a CGI context that we do not care
1224
# about. Should we ?)
1225
raise tests.TestNotApplicable(
1226
'pycurl does not check HTTP_PROXY for security reasons')
1227
self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
1210
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1211
self.assertProxied()
1229
1213
def test_all_proxy(self):
1230
self.proxied_in_env({'all_proxy': self.proxy_url})
1214
self.overrideEnv('all_proxy', self.proxy_url)
1215
self.assertProxied()
1232
1217
def test_ALL_PROXY(self):
1233
self.proxied_in_env({'ALL_PROXY': self.proxy_url})
1218
self.overrideEnv('ALL_PROXY', self.proxy_url)
1219
self.assertProxied()
1235
1221
def test_http_proxy_with_no_proxy(self):
1236
self.not_proxied_in_env({'http_proxy': self.proxy_url,
1237
'no_proxy': self.no_proxy_host})
1222
self.overrideEnv('no_proxy', self.no_proxy_host)
1223
self.overrideEnv('http_proxy', self.proxy_url)
1224
self.assertNotProxied()
1239
1226
def test_HTTP_PROXY_with_NO_PROXY(self):
1240
if self._testing_pycurl():
1241
raise tests.TestNotApplicable(
1242
'pycurl does not check HTTP_PROXY for security reasons')
1243
self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
1244
'NO_PROXY': self.no_proxy_host})
1227
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1228
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1229
self.assertNotProxied()
1246
1231
def test_all_proxy_with_no_proxy(self):
1247
self.not_proxied_in_env({'all_proxy': self.proxy_url,
1248
'no_proxy': self.no_proxy_host})
1232
self.overrideEnv('no_proxy', self.no_proxy_host)
1233
self.overrideEnv('all_proxy', self.proxy_url)
1234
self.assertNotProxied()
1250
1236
def test_ALL_PROXY_with_NO_PROXY(self):
1251
self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
1252
'NO_PROXY': self.no_proxy_host})
1237
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1238
self.overrideEnv('ALL_PROXY', self.proxy_url)
1239
self.assertNotProxied()
1254
1241
def test_http_proxy_without_scheme(self):
1255
if self._testing_pycurl():
1256
# pycurl *ignores* invalid proxy env variables. If that ever change
1257
# in the future, this test will fail indicating that pycurl do not
1258
# ignore anymore such variables.
1259
self.not_proxied_in_env({'http_proxy': self.proxy_address})
1261
self.assertRaises(errors.InvalidURL,
1262
self.proxied_in_env,
1263
{'http_proxy': self.proxy_address})
1242
self.overrideEnv('http_proxy', self.server_host_port)
1243
self.assertRaises(errors.InvalidURL, self.assertProxied)
1266
1246
class TestRanges(http_utils.TestCaseWithWebserver):
1267
1247
"""Test the Range header in GET methods."""
1249
scenarios = multiply_scenarios(
1250
vary_by_http_client_implementation(),
1251
vary_by_http_protocol_version(),
1269
1254
def setUp(self):
1270
http_utils.TestCaseWithWebserver.setUp(self)
1255
super(TestRanges, self).setUp()
1271
1256
self.build_tree_contents([('a', '0123456789')],)
1272
server = self.get_readonly_server()
1273
self.transport = self._transport(server.get_url())
1275
1258
def create_transport_readonly_server(self):
1276
1259
return http_server.HttpServer(protocol_version=self._protocol_version)
1278
1261
def _file_contents(self, relpath, ranges):
1262
t = self.get_readonly_transport()
1279
1263
offsets = [ (start, end - start + 1) for start, end in ranges]
1280
coalesce = self.transport._coalesce_offsets
1264
coalesce = t._coalesce_offsets
1281
1265
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1282
code, data = self.transport._get(relpath, coalesced)
1266
code, data = t._get(relpath, coalesced)
1283
1267
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1284
1268
for start, end in ranges:
1285
1269
data.seek(start)
1286
1270
yield data.read(end - start + 1)
1288
1272
def _file_tail(self, relpath, tail_amount):
1289
code, data = self.transport._get(relpath, [], tail_amount)
1273
t = self.get_readonly_transport()
1274
code, data = t._get(relpath, [], tail_amount)
1290
1275
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1291
1276
data.seek(-tail_amount, 2)
1292
1277
return data.read(tail_amount)
1294
1279
def test_range_header(self):
1296
map(self.assertEqual,['0', '234'],
1297
list(self._file_contents('a', [(0,0), (2,4)])),)
1282
['0', '234'], list(self._file_contents('a', [(0,0), (2,4)])))
1299
1284
def test_range_header_tail(self):
1300
1285
self.assertEqual('789', self._file_tail('a', 3))
1642
1633
# Only one 'Authentication Required' error should occur
1643
1634
self.assertEqual(1, self.server.auth_required_errors)
1645
def test_user_from_auth_conf(self):
1646
if self._testing_pycurl():
1647
raise tests.TestNotApplicable(
1648
'pycurl does not support authentication.conf')
1651
self.server.add_user(user, password)
1652
# Create a minimal config file with the right password
1653
conf = config.AuthenticationConfig()
1654
conf._get_config().update(
1655
{'httptest': {'scheme': 'http', 'port': self.server.port,
1656
'user': user, 'password': password}})
1658
t = self.get_user_transport(None, None)
1659
# Issue a request to the server to connect
1660
self.assertEqual('contents of a\n', t.get('a').read())
1661
# Only one 'Authentication Required' error should occur
1662
self.assertEqual(1, self.server.auth_required_errors)
1664
1636
def test_changing_nonce(self):
1665
1637
if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1666
1638
http_utils.ProxyDigestAuthServer):
1667
1639
raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1668
if self._testing_pycurl():
1669
raise tests.KnownFailure(
1670
'pycurl does not handle a nonce change')
1671
1640
self.server.add_user('joe', 'foo')
1672
1641
t = self.get_user_transport('joe', 'foo')
1673
1642
self.assertEqual('contents of a\n', t.get('a').read())
1682
1651
# initial 'who are you' and a second 'who are you' with the new nonce)
1683
1652
self.assertEqual(2, self.server.auth_required_errors)
1654
def test_user_from_auth_conf(self):
1657
self.server.add_user(user, password)
1658
_setup_authentication_config(scheme='http', port=self.server.port,
1659
user=user, password=password)
1660
t = self.get_user_transport(None, None)
1661
# Issue a request to the server to connect
1662
self.assertEqual('contents of a\n', t.get('a').read())
1663
# Only one 'Authentication Required' error should occur
1664
self.assertEqual(1, self.server.auth_required_errors)
1666
def test_no_credential_leaks_in_log(self):
1667
self.overrideAttr(debug, 'debug_flags', {'http'})
1669
password = 'very-sensitive-password'
1670
self.server.add_user(user, password)
1671
t = self.get_user_transport(user, password)
1672
# Capture the debug calls to mutter
1675
lines = args[0] % args[1:]
1676
# Some calls output multiple lines, just split them now since we
1677
# care about a single one later.
1678
self.mutters.extend(lines.splitlines())
1679
self.overrideAttr(trace, 'mutter', mutter)
1680
# Issue a request to the server to connect
1681
self.assertEqual(True, t.has('a'))
1682
# Only one 'Authentication Required' error should occur
1683
self.assertEqual(1, self.server.auth_required_errors)
1684
# Since the authentification succeeded, there should be a corresponding
1686
sent_auth_headers = [line for line in self.mutters
1687
if line.startswith('> %s' % (self._auth_header,))]
1688
self.assertLength(1, sent_auth_headers)
1689
self.assertStartsWith(sent_auth_headers[0],
1690
'> %s: <masked>' % (self._auth_header,))
1687
1693
class TestProxyAuth(TestAuth):
1688
"""Test proxy authentication schemes."""
1690
_auth_header = 'Proxy-authorization'
1691
_password_prompt_prefix = 'Proxy '
1692
_username_prompt_prefix = 'Proxy '
1694
"""Test proxy authentication schemes.
1696
This inherits from TestAuth to tweak the setUp and filter some failing
1700
scenarios = multiply_scenarios(
1701
vary_by_http_client_implementation(),
1702
vary_by_http_protocol_version(),
1703
vary_by_http_proxy_auth_scheme(),
1694
1706
def setUp(self):
1695
1707
super(TestProxyAuth, self).setUp()
1697
self.addCleanup(self._restore_env)
1698
1708
# Override the contents to avoid false positives
1699
1709
self.build_tree_contents([('a', 'not proxied contents of a\n'),
1700
1710
('b', 'not proxied contents of b\n'),
1863
1874
r = t._redirected_to('http://www.example.com/foo',
1864
1875
'http://foo.example.com/foo/subdir')
1865
1876
self.assertIsInstance(r, type(t))
1877
self.assertEqual('http://foo.example.com/foo/subdir/',
1867
1880
def test_redirected_to_same_host_sibling_protocol(self):
1868
1881
t = self._transport('http://www.example.com/foo')
1869
1882
r = t._redirected_to('http://www.example.com/foo',
1870
1883
'https://www.example.com/foo')
1871
1884
self.assertIsInstance(r, type(t))
1885
self.assertEqual('https://www.example.com/foo/',
1873
1888
def test_redirected_to_same_host_different_protocol(self):
1874
1889
t = self._transport('http://www.example.com/foo')
1875
1890
r = t._redirected_to('http://www.example.com/foo',
1876
1891
'ftp://www.example.com/foo')
1877
self.assertNotEquals(type(r), type(t))
1892
self.assertNotEqual(type(r), type(t))
1893
self.assertEqual('ftp://www.example.com/foo/', r.external_url())
1895
def test_redirected_to_same_host_specific_implementation(self):
1896
t = self._transport('http://www.example.com/foo')
1897
r = t._redirected_to('http://www.example.com/foo',
1898
'https+urllib://www.example.com/foo')
1899
self.assertEqual('https://www.example.com/foo/', r.external_url())
1879
1901
def test_redirected_to_different_host_same_user(self):
1880
1902
t = self._transport('http://joe@www.example.com/foo')
1881
1903
r = t._redirected_to('http://www.example.com/foo',
1882
1904
'https://foo.example.com/foo')
1883
1905
self.assertIsInstance(r, type(t))
1884
self.assertEqual(t._user, r._user)
1906
self.assertEqual(t._parsed_url.user, r._parsed_url.user)
1907
self.assertEqual('https://joe@foo.example.com/foo/', r.external_url())
1887
1910
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
2095
2114
class TestActivity(tests.TestCase, TestActivityMixin):
2116
scenarios = multiply_scenarios(
2117
vary_by_http_activity(),
2118
vary_by_http_protocol_version(),
2097
2121
def setUp(self):
2098
tests.TestCase.setUp(self)
2099
self.server = self._activity_server(self._protocol_version)
2100
self.server.start_server()
2101
self.activities = {}
2102
def report_activity(t, bytes, direction):
2103
count = self.activities.get(direction, 0)
2105
self.activities[direction] = count
2107
# We override at class level because constructors may propagate the
2108
# bound method and render instance overriding ineffective (an
2109
# alternative would be to define a specific ui factory instead...)
2110
self.orig_report_activity = self._transport._report_activity
2111
self._transport._report_activity = report_activity
2114
self._transport._report_activity = self.orig_report_activity
2115
self.server.stop_server()
2116
tests.TestCase.tearDown(self)
2122
super(TestActivity, self).setUp()
2123
TestActivityMixin.setUp(self)
2119
2126
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2128
# Unlike TestActivity, we are really testing ReportingFileSocket and
2129
# ReportingSocket, so we don't need all the parametrization. Since
2130
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2131
# test them through their use by the transport than directly (that's a
2132
# bit less clean but far more simpler and effective).
2133
_activity_server = ActivityHTTPServer
2134
_protocol_version = 'HTTP/1.1'
2121
2136
def setUp(self):
2122
tests.TestCase.setUp(self)
2123
# Unlike TestActivity, we are really testing ReportingFileSocket and
2124
# ReportingSocket, so we don't need all the parametrization. Since
2125
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2126
# test them through their use by the transport than directly (that's a
2127
# bit less clean but far more simpler and effective).
2128
self.server = ActivityHTTPServer('HTTP/1.1')
2129
self._transport=_urllib.HttpTransport_urllib
2131
self.server.start_server()
2133
# We override at class level because constructors may propagate the
2134
# bound method and render instance overriding ineffective (an
2135
# alternative would be to define a specific ui factory instead...)
2136
self.orig_report_activity = self._transport._report_activity
2137
self._transport._report_activity = None
2140
self._transport._report_activity = self.orig_report_activity
2141
self.server.stop_server()
2142
tests.TestCase.tearDown(self)
2137
super(TestNoReportActivity, self).setUp()
2138
self._transport =_urllib.HttpTransport_urllib
2139
TestActivityMixin.setUp(self)
2144
2141
def assertActivitiesMatch(self):
2145
2142
# Nothing to check here
2176
2164
self.new_server.port)
2177
2165
self.old_server.redirections = [
2178
2166
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2179
self.old_transport = self._transport(self.old_server.get_url())
2167
self.old_transport = self.get_old_transport()
2180
2168
self.new_server.add_user('joe', 'foo')
2182
def get_a(self, transport):
2183
return transport.get('a')
2169
cleanup_http_redirection_connections(self)
2171
def create_transport_readonly_server(self):
2172
server = self._auth_server(protocol_version=self._protocol_version)
2173
server._url_protocol = self._url_protocol
2185
2179
def test_auth_on_redirected_via_do_catching_redirections(self):
2186
2180
self.redirections = 0
2188
def redirected(transport, exception, redirection_notice):
2182
def redirected(t, exception, redirection_notice):
2189
2183
self.redirections += 1
2190
dir, file = urlutils.split(exception.target)
2191
return self._transport(dir)
2184
redirected_t = t._redirected_to(exception.source, exception.target)
2185
self.addCleanup(redirected_t.disconnect)
2193
stdout = tests.StringIOWrapper()
2194
stderr = tests.StringIOWrapper()
2195
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2196
stdout=stdout, stderr=stderr)
2188
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n')
2197
2189
self.assertEqual('redirected once',
2198
2190
transport.do_catching_redirections(
2199
2191
self.get_a, self.old_transport, redirected).read())