20
20
transport implementation, http protocol versions and authentication schemes.
23
# TODO: Should be renamed to bzrlib.transport.http.tests?
24
# TODO: What about renaming to bzrlib.tests.transport.http ?
23
# TODO: Should be renamed to breezy.transport.http.tests?
24
# TODO: What about renaming to breezy.tests.transport.http ?
26
from cStringIO import StringIO
30
import SimpleHTTPServer
27
from http.client import UnknownProtocol, parse_headers
28
from http.server import SimpleHTTPRequestHandler
29
except ImportError: # python < 3
30
from httplib import UnknownProtocol
31
from SimpleHTTPServer import SimpleHTTPRequestHandler
41
remote as _mod_remote,
47
from bzrlib.symbol_versioning import (
51
remote as _mod_remote,
50
from bzrlib.tests import (
53
from ..sixish import PY3
55
from bzrlib.transport import (
60
from .scenarios import (
61
load_tests_apply_scenarios,
64
from ..transport import (
59
from bzrlib.transport.http import (
68
from ..transport.http import (
65
if features.pycurl.available():
66
from bzrlib.transport.http._pycurl import PyCurlTransport
69
def load_tests(standard_tests, module, loader):
70
"""Multiply tests for http clients and protocol versions."""
71
result = loader.suiteClass()
73
# one for each transport implementation
74
t_tests, remaining_tests = tests.split_suite_by_condition(
75
standard_tests, tests.condition_isinstance((
76
TestHttpTransportRegistration,
77
TestHttpTransportUrls,
73
load_tests = load_tests_apply_scenarios
76
def vary_by_http_client_implementation():
77
"""Test the libraries we can use, currently just urllib."""
80
78
transport_scenarios = [
81
('urllib', dict(_transport=_urllib.HttpTransport_urllib,
82
_server=http_server.HttpServer_urllib,
83
_qualified_prefix='http+urllib',)),
85
if features.pycurl.available():
86
transport_scenarios.append(
87
('pycurl', dict(_transport=PyCurlTransport,
88
_server=http_server.HttpServer_PyCurl,
89
_qualified_prefix='http+pycurl',)))
90
tests.multiply_tests(t_tests, transport_scenarios, result)
92
# each implementation tested with each HTTP version
93
tp_tests, remaining_tests = tests.split_suite_by_condition(
94
remaining_tests, tests.condition_isinstance((
95
SmartHTTPTunnellingTest,
96
TestDoCatchRedirections,
99
TestHTTPSilentRedirections,
100
TestLimitedRangeRequestServer,
104
TestSpecificRequestHandler,
106
protocol_scenarios = [
107
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
108
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
110
tp_scenarios = tests.multiply_scenarios(transport_scenarios,
112
tests.multiply_tests(tp_tests, tp_scenarios, result)
114
# proxy auth: each auth scheme on all http versions on all implementations.
115
tppa_tests, remaining_tests = tests.split_suite_by_condition(
116
remaining_tests, tests.condition_isinstance((
119
proxy_auth_scheme_scenarios = [
120
('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
121
('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
123
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
125
tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
proxy_auth_scheme_scenarios)
127
tests.multiply_tests(tppa_tests, tppa_scenarios, result)
129
# auth: each auth scheme on all http versions on all implementations.
130
tpa_tests, remaining_tests = tests.split_suite_by_condition(
131
remaining_tests, tests.condition_isinstance((
134
auth_scheme_scenarios = [
79
('urllib', dict(_transport=HttpTransport,
80
_server=http_server.HttpServer,
81
_url_protocol='http',)),
83
return transport_scenarios
86
def vary_by_http_protocol_version():
87
"""Test on http/1.0 and 1.1"""
89
('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
90
('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
94
def vary_by_http_auth_scheme():
135
96
('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
136
97
('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
138
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
140
tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
141
auth_scheme_scenarios)
142
tests.multiply_tests(tpa_tests, tpa_scenarios, result)
144
# activity: on all http[s] versions on all implementations
145
tpact_tests, remaining_tests = tests.split_suite_by_condition(
146
remaining_tests, tests.condition_isinstance((
99
dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
101
# Add some attributes common to all scenarios
102
for scenario_id, scenario_dict in scenarios:
103
scenario_dict.update(_auth_header='Authorization',
104
_username_prompt_prefix='',
105
_password_prompt_prefix='')
109
def vary_by_http_proxy_auth_scheme():
111
('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
112
('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
113
('proxy-basicdigest',
114
dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
116
# Add some attributes common to all scenarios
117
for scenario_id, scenario_dict in scenarios:
118
scenario_dict.update(_auth_header='Proxy-Authorization',
119
_username_prompt_prefix='Proxy ',
120
_password_prompt_prefix='Proxy ')
124
def vary_by_http_activity():
149
125
activity_scenarios = [
150
126
('urllib,http', dict(_activity_server=ActivityHTTPServer,
151
_transport=_urllib.HttpTransport_urllib,)),
127
_transport=HttpTransport,)),
153
if tests.HTTPSServerFeature.available():
129
if features.HTTPSServerFeature.available():
130
# FIXME: Until we have a better way to handle self-signed certificates
131
# (like allowing them in a test specific authentication.conf for
132
# example), we need some specialized urllib transport for tests.
138
class HTTPS_transport(HttpTransport):
140
def __init__(self, base, _from_transport=None):
141
super(HTTPS_transport, self).__init__(
142
base, _from_transport=_from_transport,
143
ca_certs=ssl_certs.build_path('ca.crt'))
154
145
activity_scenarios.append(
155
146
('urllib,https', dict(_activity_server=ActivityHTTPSServer,
156
_transport=_urllib.HttpTransport_urllib,)),)
157
if features.pycurl.available():
158
activity_scenarios.append(
159
('pycurl,http', dict(_activity_server=ActivityHTTPServer,
160
_transport=PyCurlTransport,)),)
161
if tests.HTTPSServerFeature.available():
162
from bzrlib.tests import (
165
# FIXME: Until we have a better way to handle self-signed
166
# certificates (like allowing them in a test specific
167
# authentication.conf for example), we need some specialized pycurl
168
# transport for tests.
169
class HTTPS_pycurl_transport(PyCurlTransport):
171
def __init__(self, base, _from_transport=None):
172
super(HTTPS_pycurl_transport, self).__init__(
173
base, _from_transport)
174
self.cabundle = str(ssl_certs.build_path('ca.crt'))
176
activity_scenarios.append(
177
('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
178
_transport=HTTPS_pycurl_transport,)),)
180
tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
182
tests.multiply_tests(tpact_tests, tpact_scenarios, result)
184
# No parametrization for the remaining tests
185
result.addTests(remaining_tests)
147
_transport=HTTPS_transport,)),)
148
return activity_scenarios
190
151
class FakeManager(object):
366
368
server = http_server.HttpServer(RequestHandlerOneOne,
367
369
protocol_version='HTTP/1.0')
368
370
self.start_server(server)
369
self.assertIsInstance(server._httpd,
371
self.assertIsInstance(server.server,
370
372
http_server.TestingHTTPServer)
373
class TestWithTransport_pycurl(object):
374
"""Test case to inherit from if pycurl is present"""
376
def _get_pycurl_maybe(self):
377
self.requireFeature(features.pycurl)
378
return PyCurlTransport
380
_transport = property(_get_pycurl_maybe)
383
class TestHttpUrls(tests.TestCase):
385
# TODO: This should be moved to authorization tests once they
388
def test_url_parsing(self):
390
url = http.extract_auth('http://example.com', f)
391
self.assertEqual('http://example.com', url)
392
self.assertEqual(0, len(f.credentials))
393
url = http.extract_auth(
394
'http://user:pass@example.com/bzr/bzr.dev', f)
395
self.assertEqual('http://example.com/bzr/bzr.dev', url)
396
self.assertEqual(1, len(f.credentials))
397
self.assertEqual([None, 'example.com', 'user', 'pass'],
401
375
class TestHttpTransportUrls(tests.TestCase):
402
376
"""Test the http urls."""
378
scenarios = vary_by_http_client_implementation()
404
380
def test_abs_url(self):
405
381
"""Construction of absolute http URLs"""
406
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
382
t = self._transport('http://example.com/bzr/bzr.dev/')
407
383
eq = self.assertEqualDiff
408
eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
409
eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
410
eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
384
eq(t.abspath('.'), 'http://example.com/bzr/bzr.dev')
385
eq(t.abspath('foo/bar'), 'http://example.com/bzr/bzr.dev/foo/bar')
386
eq(t.abspath('.bzr'), 'http://example.com/bzr/bzr.dev/.bzr')
411
387
eq(t.abspath('.bzr/1//2/./3'),
412
'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
388
'http://example.com/bzr/bzr.dev/.bzr/1/2/3')
414
390
def test_invalid_http_urls(self):
415
391
"""Trap invalid construction of urls"""
416
t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
417
self.assertRaises(errors.InvalidURL,
392
self._transport('http://example.com/bzr/bzr.dev/')
393
self.assertRaises(urlutils.InvalidURL,
419
'http://http://bazaar-vcs.org/bzr/bzr.dev/')
395
'http://example.com:port/bzr/bzr.dev/')
421
397
def test_http_root_urls(self):
422
398
"""Construction of URLs from server root"""
423
t = self._transport('http://bzr.ozlabs.org/')
399
t = self._transport('http://example.com/')
424
400
eq = self.assertEqualDiff
425
401
eq(t.abspath('.bzr/tree-version'),
426
'http://bzr.ozlabs.org/.bzr/tree-version')
402
'http://example.com/.bzr/tree-version')
428
404
def test_http_impl_urls(self):
429
405
"""There are servers which ask for particular clients to connect"""
431
407
server.start_server()
433
409
url = server.get_url()
434
self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
410
self.assertTrue(url.startswith('%s://' % self._url_protocol))
436
412
server.stop_server()
439
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
441
# TODO: This should really be moved into another pycurl
442
# specific test. When https tests will be implemented, take
443
# this one into account.
444
def test_pycurl_without_https_support(self):
445
"""Test that pycurl without SSL do not fail with a traceback.
447
For the purpose of the test, we force pycurl to ignore
448
https by supplying a fake version_info that do not
451
self.requireFeature(features.pycurl)
452
# Import the module locally now that we now it's available.
453
pycurl = features.pycurl.module
455
self.overrideAttr(pycurl, 'version_info',
456
# Fake the pycurl version_info This was taken from
457
# a windows pycurl without SSL (thanks to bialix)
466
('ftp', 'gopher', 'telnet',
467
'dict', 'ldap', 'http', 'file'),
471
self.assertRaises(errors.DependencyNotPresent, self._transport,
472
'https://launchpad.net')
475
415
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
476
416
"""Test the http connections."""
418
scenarios = multiply_scenarios(
419
vary_by_http_client_implementation(),
420
vary_by_http_protocol_version(),
479
http_utils.TestCaseWithWebserver.setUp(self)
424
super(TestHTTPConnections, self).setUp()
480
425
self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
481
426
transport=self.get_transport())
483
428
def test_http_has(self):
484
429
server = self.get_readonly_server()
485
t = self._transport(server.get_url())
430
t = self.get_readonly_transport()
486
431
self.assertEqual(t.has('foo/bar'), True)
487
432
self.assertEqual(len(server.logs), 1)
488
433
self.assertContainsRe(server.logs[0],
489
r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
434
r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "Breezy/')
491
436
def test_http_has_not_found(self):
492
437
server = self.get_readonly_server()
493
t = self._transport(server.get_url())
438
t = self.get_readonly_transport()
494
439
self.assertEqual(t.has('not-found'), False)
495
440
self.assertContainsRe(server.logs[1],
496
r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
441
r'"HEAD /not-found HTTP/1.." 404 - "-" "Breezy/')
498
443
def test_http_get(self):
499
444
server = self.get_readonly_server()
500
t = self._transport(server.get_url())
445
t = self.get_readonly_transport()
501
446
fp = t.get('foo/bar')
502
447
self.assertEqualDiff(
504
'contents of foo/bar\n')
449
b'contents of foo/bar\n')
505
450
self.assertEqual(len(server.logs), 1)
506
451
self.assertTrue(server.logs[0].find(
507
'"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
508
% bzrlib.__version__) > -1)
452
'"GET /foo/bar HTTP/1.1" 200 - "-" "Breezy/%s'
453
% breezy.__version__) > -1)
510
455
def test_has_on_bogus_host(self):
511
456
# Get a free address and don't 'accept' on it, so that we
525
470
class TestHttpTransportRegistration(tests.TestCase):
526
471
"""Test registrations of various http implementations"""
473
scenarios = vary_by_http_client_implementation()
528
475
def test_http_registered(self):
529
t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
476
t = transport.get_transport_from_url(
477
'%s://foo.com/' % self._url_protocol)
530
478
self.assertIsInstance(t, transport.Transport)
531
479
self.assertIsInstance(t, self._transport)
534
482
class TestPost(tests.TestCase):
484
scenarios = multiply_scenarios(
485
vary_by_http_client_implementation(),
486
vary_by_http_protocol_version(),
536
489
def test_post_body_is_received(self):
537
server = RecordingServer(expect_body_tail='end-of-body',
538
scheme=self._qualified_prefix)
490
server = RecordingServer(expect_body_tail=b'end-of-body',
491
scheme=self._url_protocol)
539
492
self.start_server(server)
540
493
url = server.get_url()
541
http_transport = self._transport(url)
542
code, response = http_transport._post('abc def end-of-body')
544
server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
545
self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
494
# FIXME: needs a cleanup -- vila 20100611
495
http_transport = transport.get_transport_from_url(url)
496
code, response = http_transport._post(b'abc def end-of-body')
498
server.received_bytes.startswith(b'POST /.bzr/smart HTTP/1.'))
500
b'content-length: 19\r' in server.received_bytes.lower())
501
self.assertTrue(b'content-type: application/octet-stream\r'
502
in server.received_bytes.lower())
546
503
# The transport should not be assuming that the server can accept
547
504
# chunked encoding the first time it connects, because HTTP/1.1, so we
548
505
# check for the literal string.
550
server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
507
server.received_bytes.endswith(b'\r\n\r\nabc def end-of-body'))
553
510
class TestRangeHeader(tests.TestCase):
554
511
"""Test range_header method"""
556
513
def check_header(self, value, ranges=[], tail=0):
557
offsets = [ (start, end - start + 1) for start, end in ranges]
514
offsets = [(start, end - start + 1) for start, end in ranges]
558
515
coalesce = transport.Transport._coalesce_offsets
559
516
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
560
range_header = http.HttpTransportBase._range_header
517
range_header = http.HttpTransport._range_header
561
518
self.assertEqual(value, range_header(coalesced, tail))
563
520
def test_range_header_single(self):
564
self.check_header('0-9', ranges=[(0,9)])
565
self.check_header('100-109', ranges=[(100,109)])
521
self.check_header('0-9', ranges=[(0, 9)])
522
self.check_header('100-109', ranges=[(100, 109)])
567
524
def test_range_header_tail(self):
568
525
self.check_header('-10', tail=10)
791
741
super(TestRangeRequestServer, self).setUp()
792
self.build_tree_contents([('a', '0123456789')],)
742
self.build_tree_contents([('a', b'0123456789')],)
794
744
def test_readv(self):
795
server = self.get_readonly_server()
796
t = self._transport(server.get_url())
745
t = self.get_readonly_transport()
797
746
l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
798
self.assertEqual(l[0], (0, '0'))
799
self.assertEqual(l[1], (1, '1'))
800
self.assertEqual(l[2], (3, '34'))
801
self.assertEqual(l[3], (9, '9'))
747
self.assertEqual(l[0], (0, b'0'))
748
self.assertEqual(l[1], (1, b'1'))
749
self.assertEqual(l[2], (3, b'34'))
750
self.assertEqual(l[3], (9, b'9'))
803
752
def test_readv_out_of_order(self):
804
server = self.get_readonly_server()
805
t = self._transport(server.get_url())
753
t = self.get_readonly_transport()
806
754
l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
807
self.assertEqual(l[0], (1, '1'))
808
self.assertEqual(l[1], (9, '9'))
809
self.assertEqual(l[2], (0, '0'))
810
self.assertEqual(l[3], (3, '34'))
755
self.assertEqual(l[0], (1, b'1'))
756
self.assertEqual(l[1], (9, b'9'))
757
self.assertEqual(l[2], (0, b'0'))
758
self.assertEqual(l[3], (3, b'34'))
812
760
def test_readv_invalid_ranges(self):
813
server = self.get_readonly_server()
814
t = self._transport(server.get_url())
761
t = self.get_readonly_transport()
816
763
# This is intentionally reading off the end of the file
817
764
# since we are sure that it cannot get there
818
765
self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
819
t.readv, 'a', [(1,1), (8,10)])
766
t.readv, 'a', [(1, 1), (8, 10)])
821
768
# This is trying to seek past the end of the file, it should
822
769
# also raise a special error
823
770
self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
824
t.readv, 'a', [(12,2)])
771
t.readv, 'a', [(12, 2)])
826
773
def test_readv_multiple_get_requests(self):
827
774
server = self.get_readonly_server()
828
t = self._transport(server.get_url())
775
t = self.get_readonly_transport()
829
776
# force transport to issue multiple requests
830
777
t._max_readv_combine = 1
831
778
t._max_get_ranges = 1
832
779
l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
833
self.assertEqual(l[0], (0, '0'))
834
self.assertEqual(l[1], (1, '1'))
835
self.assertEqual(l[2], (3, '34'))
836
self.assertEqual(l[3], (9, '9'))
780
self.assertEqual(l[0], (0, b'0'))
781
self.assertEqual(l[1], (1, b'1'))
782
self.assertEqual(l[2], (3, b'34'))
783
self.assertEqual(l[3], (9, b'9'))
837
784
# The server should have issued 4 requests
838
785
self.assertEqual(4, server.GET_request_nb)
840
787
def test_readv_get_max_size(self):
841
788
server = self.get_readonly_server()
842
t = self._transport(server.get_url())
789
t = self.get_readonly_transport()
843
790
# force transport to issue multiple requests by limiting the number of
844
791
# bytes by request. Note that this apply to coalesced offsets only, a
845
792
# single range will keep its size even if bigger than the limit.
846
793
t._get_max_size = 2
847
794
l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
848
self.assertEqual(l[0], (0, '0'))
849
self.assertEqual(l[1], (1, '1'))
850
self.assertEqual(l[2], (2, '2345'))
851
self.assertEqual(l[3], (6, '6789'))
795
self.assertEqual(l[0], (0, b'0'))
796
self.assertEqual(l[1], (1, b'1'))
797
self.assertEqual(l[2], (2, b'2345'))
798
self.assertEqual(l[3], (6, b'6789'))
852
799
# The server should have issued 3 requests
853
800
self.assertEqual(3, server.GET_request_nb)
855
802
def test_complete_readv_leave_pipe_clean(self):
856
803
server = self.get_readonly_server()
857
t = self._transport(server.get_url())
804
t = self.get_readonly_transport()
858
805
# force transport to issue multiple requests
859
806
t._get_max_size = 2
860
l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
807
list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
861
808
# The server should have issued 3 requests
862
809
self.assertEqual(3, server.GET_request_nb)
863
self.assertEqual('0123456789', t.get_bytes('a'))
810
self.assertEqual(b'0123456789', t.get_bytes('a'))
864
811
self.assertEqual(4, server.GET_request_nb)
866
813
def test_incomplete_readv_leave_pipe_clean(self):
867
814
server = self.get_readonly_server()
868
t = self._transport(server.get_url())
815
t = self.get_readonly_transport()
869
816
# force transport to issue multiple requests
870
817
t._get_max_size = 2
871
818
# Don't collapse readv results into a list so that we leave unread
872
819
# bytes on the socket
873
820
ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
874
self.assertEqual((0, '0'), ireadv.next())
821
self.assertEqual((0, b'0'), next(ireadv))
875
822
# The server should have issued one request so far
876
823
self.assertEqual(1, server.GET_request_nb)
877
self.assertEqual('0123456789', t.get_bytes('a'))
824
self.assertEqual(b'0123456789', t.get_bytes('a'))
878
825
# get_bytes issued an additional request, the readv pending ones are
880
827
self.assertEqual(2, server.GET_request_nb)
1018
967
def setUp(self):
1019
968
super(TestTruncatedMultipleRangeServer, self).setUp()
1020
self.build_tree_contents([('a', '0123456789')],)
969
self.build_tree_contents([('a', b'0123456789')],)
1022
971
def test_readv_with_short_reads(self):
1023
972
server = self.get_readonly_server()
1024
t = self._transport(server.get_url())
973
t = self.get_readonly_transport()
1025
974
# Force separate ranges for each offset
1026
975
t._bytes_to_read_before_seek = 0
1027
976
ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1028
self.assertEqual((0, '0'), ireadv.next())
1029
self.assertEqual((2, '2'), ireadv.next())
1030
if not self._testing_pycurl():
1031
# Only one request have been issued so far (except for pycurl that
1032
# try to read the whole response at once)
1033
self.assertEqual(1, server.GET_request_nb)
1034
self.assertEqual((4, '45'), ireadv.next())
1035
self.assertEqual((9, '9'), ireadv.next())
1036
# Both implementations issue 3 requests but:
1037
# - urllib does two multiple (4 ranges, then 2 ranges) then a single
1039
# - pycurl does two multiple (4 ranges, 4 ranges) then a single range
977
self.assertEqual((0, b'0'), next(ireadv))
978
self.assertEqual((2, b'2'), next(ireadv))
979
# Only one request have been issued so far
980
self.assertEqual(1, server.GET_request_nb)
981
self.assertEqual((4, b'45'), next(ireadv))
982
self.assertEqual((9, b'9'), next(ireadv))
983
# We issue 3 requests: two multiple (4 ranges, then 2 ranges) then a
1040
985
self.assertEqual(3, server.GET_request_nb)
1041
986
# Finally the client have tried a single range request and stays in
1043
988
self.assertEqual('single', t._range_hint)
991
class TruncatedBeforeBoundaryRequestHandler(
992
http_server.TestingHTTPRequestHandler):
993
"""Truncation before a boundary, like in bug 198646"""
995
_truncated_ranges = 1
997
def get_multiple_ranges(self, file, file_size, ranges):
998
self.send_response(206)
999
self.send_header('Accept-Ranges', 'bytes')
1001
self.send_header('Content-Type',
1002
'multipart/byteranges; boundary=%s' % boundary)
1003
boundary_line = b'--%s\r\n' % boundary.encode('ascii')
1004
# Calculate the Content-Length
1006
for (start, end) in ranges:
1007
content_length += len(boundary_line)
1008
content_length += self._header_line_length(
1009
'Content-type', 'application/octet-stream')
1010
content_length += self._header_line_length(
1011
'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
1012
content_length += len('\r\n') # end headers
1013
content_length += end - start # + 1
1014
content_length += len(boundary_line)
1015
self.send_header('Content-length', content_length)
1018
# Send the multipart body
1020
for (start, end) in ranges:
1021
if cur + self._truncated_ranges >= len(ranges):
1022
# Abruptly ends the response and close the connection
1023
self.close_connection = 1
1025
self.wfile.write(boundary_line)
1026
self.send_header('Content-type', 'application/octet-stream')
1027
self.send_header('Content-Range', 'bytes %d-%d/%d'
1028
% (start, end, file_size))
1030
self.send_range_content(file, start, end - start + 1)
1033
self.wfile.write(boundary_line)
1036
class TestTruncatedBeforeBoundary(TestSpecificRequestHandler):
1037
"""Tests the case of bug 198646, disconnecting before a boundary."""
1039
_req_handler_class = TruncatedBeforeBoundaryRequestHandler
1042
super(TestTruncatedBeforeBoundary, self).setUp()
1043
self.build_tree_contents([('a', b'0123456789')],)
1045
def test_readv_with_short_reads(self):
1046
server = self.get_readonly_server()
1047
t = self.get_readonly_transport()
1048
# Force separate ranges for each offset
1049
t._bytes_to_read_before_seek = 0
1050
ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1051
self.assertEqual((0, b'0'), next(ireadv))
1052
self.assertEqual((2, b'2'), next(ireadv))
1053
self.assertEqual((4, b'45'), next(ireadv))
1054
self.assertEqual((9, b'9'), next(ireadv))
1045
1057
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1046
1058
"""Errors out when range specifiers exceed the limit"""
1078
1095
return LimitedRangeHTTPServer(range_limit=self.range_limit,
1079
1096
protocol_version=self._protocol_version)
1081
def get_transport(self):
1082
return self._transport(self.get_readonly_server().get_url())
1084
1098
def setUp(self):
1085
http_utils.TestCaseWithWebserver.setUp(self)
1099
super(TestLimitedRangeRequestServer, self).setUp()
1086
1100
# We need to manipulate ranges that correspond to real chunks in the
1087
1101
# response, so we build a content appropriately.
1088
filler = ''.join(['abcdefghij' for x in range(102)])
1089
content = ''.join(['%04d' % v + filler for v in range(16)])
1102
filler = b''.join([b'abcdefghij' for x in range(102)])
1103
content = b''.join([b'%04d' % v + filler for v in range(16)])
1090
1104
self.build_tree_contents([('a', content)],)
1092
1106
def test_few_ranges(self):
1093
t = self.get_transport()
1107
t = self.get_readonly_transport()
1094
1108
l = list(t.readv('a', ((0, 4), (1024, 4), )))
1095
self.assertEqual(l[0], (0, '0000'))
1096
self.assertEqual(l[1], (1024, '0001'))
1109
self.assertEqual(l[0], (0, b'0000'))
1110
self.assertEqual(l[1], (1024, b'0001'))
1097
1111
self.assertEqual(1, self.get_readonly_server().GET_request_nb)
1099
1113
def test_more_ranges(self):
1100
t = self.get_transport()
1114
t = self.get_readonly_transport()
1101
1115
l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
1102
self.assertEqual(l[0], (0, '0000'))
1103
self.assertEqual(l[1], (1024, '0001'))
1104
self.assertEqual(l[2], (4096, '0004'))
1105
self.assertEqual(l[3], (8192, '0008'))
1116
self.assertEqual(l[0], (0, b'0000'))
1117
self.assertEqual(l[1], (1024, b'0001'))
1118
self.assertEqual(l[2], (4096, b'0004'))
1119
self.assertEqual(l[3], (8192, b'0008'))
1106
1120
# The server will refuse to serve the first request (too much ranges),
1107
1121
# a second request will succeed.
1108
1122
self.assertEqual(2, self.get_readonly_server().GET_request_nb)
1114
1128
Only the urllib implementation is tested here.
1118
tests.TestCase.setUp(self)
1123
tests.TestCase.tearDown(self)
1125
def _install_env(self, env):
1126
for name, value in env.iteritems():
1127
self._old_env[name] = osutils.set_or_unset_env(name, value)
1129
def _restore_env(self):
1130
for name, value in self._old_env.iteritems():
1131
osutils.set_or_unset_env(name, value)
1133
1131
def _proxied_request(self):
1134
handler = _urllib2_wrappers.ProxyHandler()
1135
request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
1132
handler = http.ProxyHandler()
1133
request = http.Request('GET', 'http://baz/buzzle')
1136
1134
handler.set_proxy(request, 'http')
1137
def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1138
handler = http.ProxyHandler()
1139
self.assertEqual(expected,
1140
handler.evaluate_proxy_bypass(host, no_proxy))
1139
1142
def test_empty_user(self):
1140
self._install_env({'http_proxy': 'http://bar.com'})
1141
request = self._proxied_request()
1142
self.assertFalse(request.headers.has_key('Proxy-authorization'))
1143
self.overrideEnv('http_proxy', 'http://bar.com')
1144
request = self._proxied_request()
1145
self.assertFalse('Proxy-authorization' in request.headers)
1147
def test_user_with_at(self):
1148
self.overrideEnv('http_proxy',
1149
'http://username@domain:password@proxy_host:1234')
1150
request = self._proxied_request()
1151
self.assertFalse('Proxy-authorization' in request.headers)
1144
1153
def test_invalid_proxy(self):
1145
1154
"""A proxy env variable without scheme"""
1146
self._install_env({'http_proxy': 'host:1234'})
1147
self.assertRaises(errors.InvalidURL, self._proxied_request)
1155
self.overrideEnv('http_proxy', 'host:1234')
1156
self.assertRaises(urlutils.InvalidURL, self._proxied_request)
1158
def test_evaluate_proxy_bypass_true(self):
1159
"""The host is not proxied"""
1160
self.assertEvaluateProxyBypass(True, 'example.com', 'example.com')
1161
self.assertEvaluateProxyBypass(True, 'bzr.example.com', '*example.com')
1163
def test_evaluate_proxy_bypass_false(self):
1164
"""The host is proxied"""
1165
self.assertEvaluateProxyBypass(False, 'bzr.example.com', None)
1167
def test_evaluate_proxy_bypass_unknown(self):
1168
"""The host is not explicitly proxied"""
1169
self.assertEvaluateProxyBypass(None, 'example.com', 'not.example.com')
1170
self.assertEvaluateProxyBypass(None, 'bzr.example.com', 'example.com')
1172
def test_evaluate_proxy_bypass_empty_entries(self):
1173
"""Ignore empty entries"""
1174
self.assertEvaluateProxyBypass(None, 'example.com', '')
1175
self.assertEvaluateProxyBypass(None, 'example.com', ',')
1176
self.assertEvaluateProxyBypass(None, 'example.com', 'foo,,bar')
1150
1179
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
1156
1185
to the file names).
1188
scenarios = multiply_scenarios(
1189
vary_by_http_client_implementation(),
1190
vary_by_http_protocol_version(),
1159
1193
# FIXME: We don't have an https server available, so we don't
1160
# test https connections.
1194
# test https connections. --vila toolongago
1162
1196
def setUp(self):
1163
1197
super(TestProxyHttpServer, self).setUp()
1164
self.build_tree_contents([('foo', 'contents of foo\n'),
1165
('foo-proxied', 'proxied contents of foo\n')])
1198
self.transport_secondary_server = http_utils.ProxyServer
1199
self.build_tree_contents([('foo', b'contents of foo\n'),
1200
('foo-proxied', b'proxied contents of foo\n')])
1166
1201
# Let's setup some attributes for tests
1167
self.server = self.get_readonly_server()
1168
self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
1169
if self._testing_pycurl():
1170
# Oh my ! pycurl does not check for the port as part of
1171
# no_proxy :-( So we just test the host part
1172
self.no_proxy_host = self.server.host
1174
self.no_proxy_host = self.proxy_address
1202
server = self.get_readonly_server()
1203
self.server_host_port = '%s:%d' % (server.host, server.port)
1204
self.no_proxy_host = self.server_host_port
1175
1205
# The secondary server is the proxy
1176
self.proxy = self.get_secondary_server()
1177
self.proxy_url = self.proxy.get_url()
1180
def _testing_pycurl(self):
1181
# TODO: This is duplicated for lots of the classes in this file
1182
return (features.pycurl.available()
1183
and self._transport == PyCurlTransport)
1185
def create_transport_secondary_server(self):
1186
"""Creates an http server that will serve files with
1187
'-proxied' appended to their names.
1189
return http_utils.ProxyServer(protocol_version=self._protocol_version)
1191
def _install_env(self, env):
1192
for name, value in env.iteritems():
1193
self._old_env[name] = osutils.set_or_unset_env(name, value)
1195
def _restore_env(self):
1196
for name, value in self._old_env.iteritems():
1197
osutils.set_or_unset_env(name, value)
1199
def proxied_in_env(self, env):
1200
self._install_env(env)
1201
url = self.server.get_url()
1202
t = self._transport(url)
1204
self.assertEqual('proxied contents of foo\n', t.get('foo').read())
1208
def not_proxied_in_env(self, env):
1209
self._install_env(env)
1210
url = self.server.get_url()
1211
t = self._transport(url)
1213
self.assertEqual('contents of foo\n', t.get('foo').read())
1206
self.proxy_url = self.get_secondary_url()
1208
def assertProxied(self):
1209
t = self.get_readonly_transport()
1210
self.assertEqual(b'proxied contents of foo\n', t.get('foo').read())
1212
def assertNotProxied(self):
1213
t = self.get_readonly_transport()
1214
self.assertEqual(b'contents of foo\n', t.get('foo').read())
1217
1216
def test_http_proxy(self):
1218
self.proxied_in_env({'http_proxy': self.proxy_url})
1217
self.overrideEnv('http_proxy', self.proxy_url)
1218
self.assertProxied()
1220
1220
def test_HTTP_PROXY(self):
1221
if self._testing_pycurl():
1222
# pycurl does not check HTTP_PROXY for security reasons
1223
# (for use in a CGI context that we do not care
1224
# about. Should we ?)
1225
raise tests.TestNotApplicable(
1226
'pycurl does not check HTTP_PROXY for security reasons')
1227
self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
1221
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1222
self.assertProxied()
1229
1224
def test_all_proxy(self):
1230
self.proxied_in_env({'all_proxy': self.proxy_url})
1225
self.overrideEnv('all_proxy', self.proxy_url)
1226
self.assertProxied()
1232
1228
def test_ALL_PROXY(self):
1233
self.proxied_in_env({'ALL_PROXY': self.proxy_url})
1229
self.overrideEnv('ALL_PROXY', self.proxy_url)
1230
self.assertProxied()
1235
1232
def test_http_proxy_with_no_proxy(self):
1236
self.not_proxied_in_env({'http_proxy': self.proxy_url,
1237
'no_proxy': self.no_proxy_host})
1233
self.overrideEnv('no_proxy', self.no_proxy_host)
1234
self.overrideEnv('http_proxy', self.proxy_url)
1235
self.assertNotProxied()
1239
1237
def test_HTTP_PROXY_with_NO_PROXY(self):
1240
if self._testing_pycurl():
1241
raise tests.TestNotApplicable(
1242
'pycurl does not check HTTP_PROXY for security reasons')
1243
self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
1244
'NO_PROXY': self.no_proxy_host})
1238
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1239
self.overrideEnv('HTTP_PROXY', self.proxy_url)
1240
self.assertNotProxied()
1246
1242
def test_all_proxy_with_no_proxy(self):
1247
self.not_proxied_in_env({'all_proxy': self.proxy_url,
1248
'no_proxy': self.no_proxy_host})
1243
self.overrideEnv('no_proxy', self.no_proxy_host)
1244
self.overrideEnv('all_proxy', self.proxy_url)
1245
self.assertNotProxied()
1250
1247
def test_ALL_PROXY_with_NO_PROXY(self):
1251
self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
1252
'NO_PROXY': self.no_proxy_host})
1248
self.overrideEnv('NO_PROXY', self.no_proxy_host)
1249
self.overrideEnv('ALL_PROXY', self.proxy_url)
1250
self.assertNotProxied()
1254
1252
def test_http_proxy_without_scheme(self):
1255
if self._testing_pycurl():
1256
# pycurl *ignores* invalid proxy env variables. If that ever change
1257
# in the future, this test will fail indicating that pycurl do not
1258
# ignore anymore such variables.
1259
self.not_proxied_in_env({'http_proxy': self.proxy_address})
1261
self.assertRaises(errors.InvalidURL,
1262
self.proxied_in_env,
1263
{'http_proxy': self.proxy_address})
1253
self.overrideEnv('http_proxy', self.server_host_port)
1254
self.assertRaises(urlutils.InvalidURL, self.assertProxied)
1266
1257
class TestRanges(http_utils.TestCaseWithWebserver):
1267
1258
"""Test the Range header in GET methods."""
1260
scenarios = multiply_scenarios(
1261
vary_by_http_client_implementation(),
1262
vary_by_http_protocol_version(),
1269
1265
def setUp(self):
1270
http_utils.TestCaseWithWebserver.setUp(self)
1271
self.build_tree_contents([('a', '0123456789')],)
1272
server = self.get_readonly_server()
1273
self.transport = self._transport(server.get_url())
1266
super(TestRanges, self).setUp()
1267
self.build_tree_contents([('a', b'0123456789')],)
1275
1269
def create_transport_readonly_server(self):
1276
1270
return http_server.HttpServer(protocol_version=self._protocol_version)
1278
1272
def _file_contents(self, relpath, ranges):
1279
offsets = [ (start, end - start + 1) for start, end in ranges]
1280
coalesce = self.transport._coalesce_offsets
1273
t = self.get_readonly_transport()
1274
offsets = [(start, end - start + 1) for start, end in ranges]
1275
coalesce = t._coalesce_offsets
1281
1276
coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1282
code, data = self.transport._get(relpath, coalesced)
1283
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1277
code, data = t._get(relpath, coalesced)
1278
self.assertTrue(code in (200, 206), '_get returns: %d' % code)
1284
1279
for start, end in ranges:
1285
1280
data.seek(start)
1286
1281
yield data.read(end - start + 1)
1288
1283
def _file_tail(self, relpath, tail_amount):
1289
code, data = self.transport._get(relpath, [], tail_amount)
1290
self.assertTrue(code in (200, 206),'_get returns: %d' % code)
1284
t = self.get_readonly_transport()
1285
code, data = t._get(relpath, [], tail_amount)
1286
self.assertTrue(code in (200, 206), '_get returns: %d' % code)
1291
1287
data.seek(-tail_amount, 2)
1292
1288
return data.read(tail_amount)
1294
1290
def test_range_header(self):
1296
map(self.assertEqual,['0', '234'],
1297
list(self._file_contents('a', [(0,0), (2,4)])),)
1293
[b'0', b'234'], list(self._file_contents('a', [(0, 0), (2, 4)])))
1299
1295
def test_range_header_tail(self):
1300
self.assertEqual('789', self._file_tail('a', 3))
1296
self.assertEqual(b'789', self._file_tail('a', 3))
1302
1298
def test_syntactically_invalid_range_header(self):
1303
1299
self.assertListRaises(errors.InvalidHttpRange,
1304
self._file_contents, 'a', [(4, 3)])
1300
self._file_contents, 'a', [(4, 3)])
1306
1302
def test_semantically_invalid_range_header(self):
1307
1303
self.assertListRaises(errors.InvalidHttpRange,
1308
self._file_contents, 'a', [(42, 128)])
1304
self._file_contents, 'a', [(42, 128)])
1311
1307
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
1312
1308
"""Test redirection between http servers."""
1314
def create_transport_secondary_server(self):
1315
"""Create the secondary server redirecting to the primary server"""
1316
new = self.get_readonly_server()
1318
redirecting = http_utils.HTTPServerRedirecting(
1319
protocol_version=self._protocol_version)
1320
redirecting.redirect_to(new.host, new.port)
1310
scenarios = multiply_scenarios(
1311
vary_by_http_client_implementation(),
1312
vary_by_http_protocol_version(),
1323
1315
def setUp(self):
1324
1316
super(TestHTTPRedirections, self).setUp()
1325
self.build_tree_contents([('a', '0123456789'),
1317
self.build_tree_contents([('a', b'0123456789'),
1327
'# Bazaar revision bundle v0.9\n#\n')
1319
b'# Bazaar revision bundle v0.9\n#\n')
1329
# The requests to the old server will be redirected to the new server
1330
self.old_transport = self._transport(self.old_server.get_url())
1332
1322
def test_redirected(self):
1333
self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
1334
t = self._transport(self.new_server.get_url())
1335
self.assertEqual('0123456789', t.get('a').read())
1337
def test_read_redirected_bundle_from_url(self):
1338
from bzrlib.bundle import read_bundle_from_url
1339
url = self.old_transport.abspath('bundle')
1340
bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
1341
read_bundle_from_url, url)
1342
# If read_bundle_from_url was successful we get an empty bundle
1343
self.assertEqual([], bundle.revisions)
1346
class RedirectedRequest(_urllib2_wrappers.Request):
1323
self.assertRaises(errors.RedirectRequested,
1324
self.get_old_transport().get, 'a')
1327
self.get_new_transport().get('a').read())
1330
class RedirectedRequest(http.Request):
1347
1331
"""Request following redirections. """
1349
init_orig = _urllib2_wrappers.Request.__init__
1333
init_orig = http.Request.__init__
1351
1335
def __init__(self, method, url, *args, **kwargs):
1352
1336
"""Constructor.
1355
1339
# Since the tests using this class will replace
1356
# _urllib2_wrappers.Request, we can't just call the base class __init__
1340
# http.Request, we can't just call the base class __init__
1357
1341
# or we'll loop.
1358
1342
RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
1359
1343
self.follow_redirections = True
1362
1346
def install_redirected_request(test):
1363
test.overrideAttr(_urllib2_wrappers, 'Request', RedirectedRequest)
1347
test.overrideAttr(http, 'Request', RedirectedRequest)
1350
def cleanup_http_redirection_connections(test):
1351
# Some sockets are opened but never seen by _urllib, so we trap them at
1352
# the http level to be able to clean them up.
1353
def socket_disconnect(sock):
1355
sock.shutdown(socket.SHUT_RDWR)
1357
except socket.error:
1360
def connect(connection):
1361
test.http_connect_orig(connection)
1362
test.addCleanup(socket_disconnect, connection.sock)
1363
test.http_connect_orig = test.overrideAttr(
1364
http.HTTPConnection, 'connect', connect)
1366
def connect(connection):
1367
test.https_connect_orig(connection)
1368
test.addCleanup(socket_disconnect, connection.sock)
1369
test.https_connect_orig = test.overrideAttr(
1370
http.HTTPSConnection, 'connect', connect)
1366
1373
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
1369
1376
http implementations do not redirect silently anymore (they
1370
1377
do not redirect at all in fact). The mechanism is still in
1371
place at the _urllib2_wrappers.Request level and these tests
1378
place at the http.Request level and these tests
1374
For the pycurl implementation
1375
the redirection have been deleted as we may deprecate pycurl
1376
and I have no place to keep a working implementation.
1382
scenarios = multiply_scenarios(
1383
vary_by_http_client_implementation(),
1384
vary_by_http_protocol_version(),
1380
1387
def setUp(self):
1381
if (features.pycurl.available()
1382
and self._transport == PyCurlTransport):
1383
raise tests.TestNotApplicable(
1384
"pycurl doesn't redirect silently annymore")
1385
1388
super(TestHTTPSilentRedirections, self).setUp()
1386
1389
install_redirected_request(self)
1387
self.build_tree_contents([('a','a'),
1390
cleanup_http_redirection_connections(self)
1391
self.build_tree_contents([('a', b'a'),
1389
('1/a', 'redirected once'),
1393
('1/a', b'redirected once'),
1391
('2/a', 'redirected twice'),
1395
('2/a', b'redirected twice'),
1393
('3/a', 'redirected thrice'),
1397
('3/a', b'redirected thrice'),
1395
('4/a', 'redirected 4 times'),
1399
('4/a', b'redirected 4 times'),
1397
('5/a', 'redirected 5 times'),
1401
('5/a', b'redirected 5 times'),
1400
self.old_transport = self._transport(self.old_server.get_url())
1402
def create_transport_secondary_server(self):
1403
"""Create the secondary server, redirections are defined in the tests"""
1404
return http_utils.HTTPServerRedirecting(
1405
protocol_version=self._protocol_version)
1407
1404
def test_one_redirection(self):
1408
t = self.old_transport
1410
req = RedirectedRequest('GET', t.abspath('a'))
1405
t = self.get_old_transport()
1411
1406
new_prefix = 'http://%s:%s' % (self.new_server.host,
1412
1407
self.new_server.port)
1413
1408
self.old_server.redirections = \
1414
[('(.*)', r'%s/1\1' % (new_prefix), 301),]
1415
self.assertEqual('redirected once',t._perform(req).read())
1409
[('(.*)', r'%s/1\1' % (new_prefix), 301), ]
1412
t.request('GET', t._remote_path('a'), retries=1).read())
1417
1414
def test_five_redirections(self):
1418
t = self.old_transport
1420
req = RedirectedRequest('GET', t.abspath('a'))
1415
t = self.get_old_transport()
1421
1416
old_prefix = 'http://%s:%s' % (self.old_server.host,
1422
1417
self.old_server.port)
1423
1418
new_prefix = 'http://%s:%s' % (self.new_server.host,
1429
1424
('/4(.*)', r'%s/5\1' % (new_prefix), 301),
1430
1425
('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
1432
self.assertEqual('redirected 5 times',t._perform(req).read())
1428
b'redirected 5 times',
1429
t.request('GET', t._remote_path('a'), retries=6).read())
1435
1432
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
1436
1433
"""Test transport.do_catching_redirections."""
1435
scenarios = multiply_scenarios(
1436
vary_by_http_client_implementation(),
1437
vary_by_http_protocol_version(),
1438
1440
def setUp(self):
1439
1441
super(TestDoCatchRedirections, self).setUp()
1440
self.build_tree_contents([('a', '0123456789'),],)
1442
self.old_transport = self._transport(self.old_server.get_url())
1444
def get_a(self, transport):
1445
return transport.get('a')
1442
self.build_tree_contents([('a', b'0123456789'), ],)
1443
cleanup_http_redirection_connections(self)
1445
self.old_transport = self.get_old_transport()
1447
1450
def test_no_redirection(self):
1448
t = self._transport(self.new_server.get_url())
1451
t = self.get_new_transport()
1450
1453
# We use None for redirected so that we fail if redirected
1451
self.assertEqual('0123456789',
1454
self.assertEqual(b'0123456789',
1452
1455
transport.do_catching_redirections(
1453
self.get_a, t, None).read())
1456
self.get_a, t, None).read())
1455
1458
def test_one_redirection(self):
1456
1459
self.redirections = 0
1458
def redirected(transport, exception, redirection_notice):
1461
def redirected(t, exception, redirection_notice):
1459
1462
self.redirections += 1
1460
dir, file = urlutils.split(exception.target)
1461
return self._transport(dir)
1463
redirected_t = t._redirected_to(exception.source, exception.target)
1463
self.assertEqual('0123456789',
1466
self.assertEqual(b'0123456789',
1464
1467
transport.do_catching_redirections(
1465
self.get_a, self.old_transport, redirected).read())
1468
self.get_a, self.old_transport, redirected).read())
1466
1469
self.assertEqual(1, self.redirections)
1468
1471
def test_redirection_loop(self):
1613
1630
def _expected_username_prompt(self, scheme):
1614
1631
return (self._username_prompt_prefix
1615
1632
+ "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1616
self.server.host, self.server.port,
1617
self.server.auth_realm))
1633
self.server.host, self.server.port,
1634
self.server.auth_realm))
1619
1636
def test_no_prompt_for_password_when_using_auth_config(self):
1620
if self._testing_pycurl():
1621
raise tests.TestNotApplicable(
1622
'pycurl does not support authentication.conf'
1623
' since it cannot prompt')
1626
1638
password = 'foo'
1627
1639
stdin_content = 'bar\n' # Not the right password
1628
1640
self.server.add_user(user, password)
1629
1641
t = self.get_user_transport(user, None)
1630
ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1631
stderr=tests.StringIOWrapper())
1642
ui.ui_factory = tests.TestUIFactory(stdin=stdin_content)
1632
1643
# Create a minimal config file with the right password
1633
conf = config.AuthenticationConfig()
1634
conf._get_config().update(
1635
{'httptest': {'scheme': 'http', 'port': self.server.port,
1636
'user': user, 'password': password}})
1644
_setup_authentication_config(scheme='http', port=self.server.port,
1645
user=user, password=password)
1638
1646
# Issue a request to the server to connect
1639
self.assertEqual('contents of a\n',t.get('a').read())
1647
with t.get('a') as f:
1648
self.assertEqual(b'contents of a\n', f.read())
1640
1649
# stdin should have been left untouched
1641
1650
self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
1642
1651
# Only one 'Authentication Required' error should occur
1643
1652
self.assertEqual(1, self.server.auth_required_errors)
1645
def test_user_from_auth_conf(self):
1646
if self._testing_pycurl():
1647
raise tests.TestNotApplicable(
1648
'pycurl does not support authentication.conf')
1651
self.server.add_user(user, password)
1652
# Create a minimal config file with the right password
1653
conf = config.AuthenticationConfig()
1654
conf._get_config().update(
1655
{'httptest': {'scheme': 'http', 'port': self.server.port,
1656
'user': user, 'password': password}})
1658
t = self.get_user_transport(None, None)
1659
# Issue a request to the server to connect
1660
self.assertEqual('contents of a\n', t.get('a').read())
1661
# Only one 'Authentication Required' error should occur
1662
self.assertEqual(1, self.server.auth_required_errors)
1664
1654
def test_changing_nonce(self):
1665
1655
if self._auth_server not in (http_utils.HTTPDigestAuthServer,
1666
1656
http_utils.ProxyDigestAuthServer):
1667
1657
raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1668
if self._testing_pycurl():
1669
raise tests.KnownFailure(
1670
'pycurl does not handle a nonce change')
1671
1658
self.server.add_user('joe', 'foo')
1672
1659
t = self.get_user_transport('joe', 'foo')
1673
self.assertEqual('contents of a\n', t.get('a').read())
1674
self.assertEqual('contents of b\n', t.get('b').read())
1660
with t.get('a') as f:
1661
self.assertEqual(b'contents of a\n', f.read())
1662
with t.get('b') as f:
1663
self.assertEqual(b'contents of b\n', f.read())
1675
1664
# Only one 'Authentication Required' error should have
1676
1665
# occured so far
1677
1666
self.assertEqual(1, self.server.auth_required_errors)
1678
1667
# The server invalidates the current nonce
1679
1668
self.server.auth_nonce = self.server.auth_nonce + '. No, now!'
1680
self.assertEqual('contents of a\n', t.get('a').read())
1669
self.assertEqual(b'contents of a\n', t.get('a').read())
1681
1670
# Two 'Authentication Required' errors should occur (the
1682
1671
# initial 'who are you' and a second 'who are you' with the new nonce)
1683
1672
self.assertEqual(2, self.server.auth_required_errors)
1674
def test_user_from_auth_conf(self):
1677
self.server.add_user(user, password)
1678
_setup_authentication_config(scheme='http', port=self.server.port,
1679
user=user, password=password)
1680
t = self.get_user_transport(None, None)
1681
# Issue a request to the server to connect
1682
with t.get('a') as f:
1683
self.assertEqual(b'contents of a\n', f.read())
1684
# Only one 'Authentication Required' error should occur
1685
self.assertEqual(1, self.server.auth_required_errors)
1687
def test_no_credential_leaks_in_log(self):
1688
self.overrideAttr(debug, 'debug_flags', {'http'})
1690
password = 'very-sensitive-password'
1691
self.server.add_user(user, password)
1692
t = self.get_user_transport(user, password)
1693
# Capture the debug calls to mutter
1697
lines = args[0] % args[1:]
1698
# Some calls output multiple lines, just split them now since we
1699
# care about a single one later.
1700
self.mutters.extend(lines.splitlines())
1701
self.overrideAttr(trace, 'mutter', mutter)
1702
# Issue a request to the server to connect
1703
self.assertEqual(True, t.has('a'))
1704
# Only one 'Authentication Required' error should occur
1705
self.assertEqual(1, self.server.auth_required_errors)
1706
# Since the authentification succeeded, there should be a corresponding
1708
sent_auth_headers = [line for line in self.mutters
1709
if line.startswith('> %s' % (self._auth_header,))]
1710
self.assertLength(1, sent_auth_headers)
1711
self.assertStartsWith(sent_auth_headers[0],
1712
'> %s: <masked>' % (self._auth_header,))
1687
1715
class TestProxyAuth(TestAuth):
1688
"""Test proxy authentication schemes."""
1690
_auth_header = 'Proxy-authorization'
1691
_password_prompt_prefix = 'Proxy '
1692
_username_prompt_prefix = 'Proxy '
1716
"""Test proxy authentication schemes.
1718
This inherits from TestAuth to tweak the setUp and filter some failing
1722
scenarios = multiply_scenarios(
1723
vary_by_http_client_implementation(),
1724
vary_by_http_protocol_version(),
1725
vary_by_http_proxy_auth_scheme(),
1694
1728
def setUp(self):
1695
1729
super(TestProxyAuth, self).setUp()
1697
self.addCleanup(self._restore_env)
1698
1730
# Override the contents to avoid false positives
1699
self.build_tree_contents([('a', 'not proxied contents of a\n'),
1700
('b', 'not proxied contents of b\n'),
1701
('a-proxied', 'contents of a\n'),
1702
('b-proxied', 'contents of b\n'),
1731
self.build_tree_contents([('a', b'not proxied contents of a\n'),
1732
('b', b'not proxied contents of b\n'),
1733
('a-proxied', b'contents of a\n'),
1734
('b-proxied', b'contents of b\n'),
1705
1737
def get_user_transport(self, user, password):
1706
self._install_env({'all_proxy': self.get_user_url(user, password)})
1707
return self._transport(self.server.get_url())
1709
def _install_env(self, env):
1710
for name, value in env.iteritems():
1711
self._old_env[name] = osutils.set_or_unset_env(name, value)
1713
def _restore_env(self):
1714
for name, value in self._old_env.iteritems():
1715
osutils.set_or_unset_env(name, value)
1717
def test_empty_pass(self):
1718
if self._testing_pycurl():
1720
if pycurl.version_info()[1] < '7.16.0':
1721
raise tests.KnownFailure(
1722
'pycurl < 7.16.0 does not handle empty proxy passwords')
1723
super(TestProxyAuth, self).test_empty_pass()
1738
proxy_url = self.get_user_url(user, password)
1739
self.overrideEnv('all_proxy', proxy_url)
1740
return TestAuth.get_user_transport(self, user, password)
1743
class NonClosingBytesIO(io.BytesIO):
1746
"""Ignore and leave file open."""
1726
1749
class SampleSocket(object):
2087
2143
t = self.get_transport()
2088
2144
# We must send a single line of body bytes, see
2089
# PredefinedRequestHandler.handle_one_request
2090
code, f = t._post('abc def end-of-body\n')
2091
self.assertEqual('lalala whatever as long as itsssss\n', f.read())
2145
# PredefinedRequestHandler._handle_one_request
2146
code, f = t._post(b'abc def end-of-body\n')
2147
self.assertEqual(b'lalala whatever as long as itsssss\n', f.read())
2092
2148
self.assertActivitiesMatch()
2095
2151
class TestActivity(tests.TestCase, TestActivityMixin):
2153
scenarios = multiply_scenarios(
2154
vary_by_http_activity(),
2155
vary_by_http_protocol_version(),
2097
2158
def setUp(self):
2098
tests.TestCase.setUp(self)
2099
self.server = self._activity_server(self._protocol_version)
2100
self.server.start_server()
2101
self.activities = {}
2102
def report_activity(t, bytes, direction):
2103
count = self.activities.get(direction, 0)
2105
self.activities[direction] = count
2107
# We override at class level because constructors may propagate the
2108
# bound method and render instance overriding ineffective (an
2109
# alternative would be to define a specific ui factory instead...)
2110
self.orig_report_activity = self._transport._report_activity
2111
self._transport._report_activity = report_activity
2114
self._transport._report_activity = self.orig_report_activity
2115
self.server.stop_server()
2116
tests.TestCase.tearDown(self)
2159
super(TestActivity, self).setUp()
2160
TestActivityMixin.setUp(self)
2119
2163
class TestNoReportActivity(tests.TestCase, TestActivityMixin):
2165
# Unlike TestActivity, we are really testing ReportingFileSocket and
2166
# ReportingSocket, so we don't need all the parametrization. Since
2167
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2168
# test them through their use by the transport than directly (that's a
2169
# bit less clean but far more simpler and effective).
2170
_activity_server = ActivityHTTPServer
2171
_protocol_version = 'HTTP/1.1'
2121
2173
def setUp(self):
2122
tests.TestCase.setUp(self)
2123
# Unlike TestActivity, we are really testing ReportingFileSocket and
2124
# ReportingSocket, so we don't need all the parametrization. Since
2125
# ReportingFileSocket and ReportingSocket are wrappers, it's easier to
2126
# test them through their use by the transport than directly (that's a
2127
# bit less clean but far more simpler and effective).
2128
self.server = ActivityHTTPServer('HTTP/1.1')
2129
self._transport=_urllib.HttpTransport_urllib
2131
self.server.start_server()
2133
# We override at class level because constructors may propagate the
2134
# bound method and render instance overriding ineffective (an
2135
# alternative would be to define a specific ui factory instead...)
2136
self.orig_report_activity = self._transport._report_activity
2137
self._transport._report_activity = None
2140
self._transport._report_activity = self.orig_report_activity
2141
self.server.stop_server()
2142
tests.TestCase.tearDown(self)
2174
super(TestNoReportActivity, self).setUp()
2175
self._transport = HttpTransport
2176
TestActivityMixin.setUp(self)
2144
2178
def assertActivitiesMatch(self):
2145
2179
# Nothing to check here
2149
2183
class TestAuthOnRedirected(http_utils.TestCaseWithRedirectedWebserver):
2150
2184
"""Test authentication on the redirected http server."""
2186
scenarios = vary_by_http_protocol_version()
2152
2188
_auth_header = 'Authorization'
2153
2189
_password_prompt_prefix = ''
2154
2190
_username_prompt_prefix = ''
2155
2191
_auth_server = http_utils.HTTPBasicAuthServer
2156
_transport = _urllib.HttpTransport_urllib
2158
def create_transport_readonly_server(self):
2159
return self._auth_server()
2161
def create_transport_secondary_server(self):
2162
"""Create the secondary server redirecting to the primary server"""
2163
new = self.get_readonly_server()
2165
redirecting = http_utils.HTTPServerRedirecting()
2166
redirecting.redirect_to(new.host, new.port)
2192
_transport = HttpTransport
2169
2194
def setUp(self):
2170
2195
super(TestAuthOnRedirected, self).setUp()
2171
self.build_tree_contents([('a','a'),
2196
self.build_tree_contents([('a', b'a'),
2173
('1/a', 'redirected once'),
2198
('1/a', b'redirected once'),
2175
2200
new_prefix = 'http://%s:%s' % (self.new_server.host,
2176
2201
self.new_server.port)
2177
2202
self.old_server.redirections = [
2178
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2179
self.old_transport = self._transport(self.old_server.get_url())
2203
('(.*)', r'%s/1\1' % (new_prefix), 301), ]
2204
self.old_transport = self.get_old_transport()
2180
2205
self.new_server.add_user('joe', 'foo')
2182
def get_a(self, transport):
2183
return transport.get('a')
2206
cleanup_http_redirection_connections(self)
2208
def create_transport_readonly_server(self):
2209
server = self._auth_server(protocol_version=self._protocol_version)
2210
server._url_protocol = self._url_protocol
2185
2216
def test_auth_on_redirected_via_do_catching_redirections(self):
2186
2217
self.redirections = 0
2188
def redirected(transport, exception, redirection_notice):
2219
def redirected(t, exception, redirection_notice):
2189
2220
self.redirections += 1
2190
dir, file = urlutils.split(exception.target)
2191
return self._transport(dir)
2221
redirected_t = t._redirected_to(exception.source, exception.target)
2222
self.addCleanup(redirected_t.disconnect)
2193
stdout = tests.StringIOWrapper()
2194
stderr = tests.StringIOWrapper()
2195
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2196
stdout=stdout, stderr=stderr)
2197
self.assertEqual('redirected once',
2225
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n')
2226
self.assertEqual(b'redirected once',
2198
2227
transport.do_catching_redirections(
2199
self.get_a, self.old_transport, redirected).read())
2228
self.get_a, self.old_transport, redirected).read())
2200
2229
self.assertEqual(1, self.redirections)
2201
2230
# stdin should be empty
2202
2231
self.assertEqual('', ui.ui_factory.stdin.readline())
2203
2232
# stdout should be empty, stderr will contains the prompts
2204
self.assertEqual('', stdout.getvalue())
2233
self.assertEqual('', ui.ui_factory.stdout.getvalue())
2206
2235
def test_auth_on_redirected_via_following_redirections(self):
2207
2236
self.new_server.add_user('joe', 'foo')
2208
stdout = tests.StringIOWrapper()
2209
stderr = tests.StringIOWrapper()
2210
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
2211
stdout=stdout, stderr=stderr)
2237
ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n')
2212
2238
t = self.old_transport
2213
req = RedirectedRequest('GET', t.abspath('a'))
2214
2239
new_prefix = 'http://%s:%s' % (self.new_server.host,
2215
2240
self.new_server.port)
2216
2241
self.old_server.redirections = [
2217
('(.*)', r'%s/1\1' % (new_prefix), 301),]
2218
self.assertEqual('redirected once',t._perform(req).read())
2242
('(.*)', r'%s/1\1' % (new_prefix), 301), ]
2245
t.request('GET', t.abspath('a'), retries=3).read())
2219
2246
# stdin should be empty
2220
2247
self.assertEqual('', ui.ui_factory.stdin.readline())
2221
2248
# stdout should be empty, stderr will contains the prompts
2222
self.assertEqual('', stdout.getvalue())
2249
self.assertEqual('', ui.ui_factory.stdout.getvalue())