/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Martin Pool
  • Date: 2009-06-05 23:21:51 UTC
  • mfrom: (4415 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4416.
  • Revision ID: mbp@sourcefrog.net-20090605232151-luwmyyl95siraqyz
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for HTTP implementations.
18
18
 
113
113
                                            protocol_scenarios)
114
114
    tests.multiply_tests(tp_tests, tp_scenarios, result)
115
115
 
 
116
    # proxy auth: each auth scheme on all http versions on all implementations.
 
117
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
 
118
        remaining_tests, tests.condition_isinstance((
 
119
                TestProxyAuth,
 
120
                )))
 
121
    proxy_auth_scheme_scenarios = [
 
122
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
123
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
124
        ('basicdigest',
 
125
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
126
        ]
 
127
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
128
                                              proxy_auth_scheme_scenarios)
 
129
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
 
130
 
116
131
    # auth: each auth scheme on all http versions on all implementations.
117
132
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
118
133
        remaining_tests, tests.condition_isinstance((
119
134
                TestAuth,
120
135
                )))
121
136
    auth_scheme_scenarios = [
122
 
        ('basic', dict(_auth_scheme='basic')),
123
 
        ('digest', dict(_auth_scheme='digest')),
 
137
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
 
138
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
 
139
        ('basicdigest',
 
140
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
124
141
        ]
125
142
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
126
 
        auth_scheme_scenarios)
 
143
                                             auth_scheme_scenarios)
127
144
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
128
145
 
129
 
    # activity: activity on all http versions on all implementations
 
146
    # activity: on all http[s] versions on all implementations
130
147
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
131
148
        remaining_tests, tests.condition_isinstance((
132
149
                TestActivity,
133
150
                )))
134
151
    activity_scenarios = [
135
 
        ('http', dict(_activity_server=ActivityHTTPServer)),
 
152
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
 
153
                             _transport=_urllib.HttpTransport_urllib,)),
136
154
        ]
137
155
    if tests.HTTPSServerFeature.available():
138
156
        activity_scenarios.append(
139
 
            ('https', dict(_activity_server=ActivityHTTPSServer)))
140
 
    tpact_scenarios = tests.multiply_scenarios(tp_scenarios,
141
 
        activity_scenarios)
 
157
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
 
158
                                  _transport=_urllib.HttpTransport_urllib,)),)
 
159
    if pycurl_present:
 
160
        activity_scenarios.append(
 
161
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
 
162
                                 _transport=PyCurlTransport,)),)
 
163
        if tests.HTTPSServerFeature.available():
 
164
            from bzrlib.tests import (
 
165
                ssl_certs,
 
166
                )
 
167
            # FIXME: Until we have a better way to handle self-signed
 
168
            # certificates (like allowing them in a test specific
 
169
            # authentication.conf for example), we need some specialized pycurl
 
170
            # transport for tests.
 
171
            class HTTPS_pycurl_transport(PyCurlTransport):
 
172
 
 
173
                def __init__(self, base, _from_transport=None):
 
174
                    super(HTTPS_pycurl_transport, self).__init__(
 
175
                        base, _from_transport)
 
176
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
 
177
 
 
178
            activity_scenarios.append(
 
179
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
 
180
                                      _transport=HTTPS_pycurl_transport,)),)
 
181
 
 
182
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
 
183
                                               protocol_scenarios)
142
184
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
143
185
 
144
186
    # No parametrization for the remaining tests
215
257
 
216
258
class TestAuthHeader(tests.TestCase):
217
259
 
218
 
    def parse_header(self, header):
219
 
        ah =  _urllib2_wrappers.AbstractAuthHandler()
220
 
        return ah._parse_auth_header(header)
 
260
    def parse_header(self, header, auth_handler_class=None):
 
261
        if auth_handler_class is None:
 
262
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
 
263
        self.auth_handler =  auth_handler_class()
 
264
        return self.auth_handler._parse_auth_header(header)
221
265
 
222
266
    def test_empty_header(self):
223
267
        scheme, remainder = self.parse_header('')
235
279
        self.assertEquals('basic', scheme)
236
280
        self.assertEquals('realm="Thou should not pass"', remainder)
237
281
 
 
282
    def test_basic_extract_realm(self):
 
283
        scheme, remainder = self.parse_header(
 
284
            'Basic realm="Thou should not pass"',
 
285
            _urllib2_wrappers.BasicAuthHandler)
 
286
        match, realm = self.auth_handler.extract_realm(remainder)
 
287
        self.assertTrue(match is not None)
 
288
        self.assertEquals('Thou should not pass', realm)
 
289
 
238
290
    def test_digest_header(self):
239
291
        scheme, remainder = self.parse_header(
240
292
            'Digest realm="Thou should not pass"')
1307
1359
        # Since the tests using this class will replace
1308
1360
        # _urllib2_wrappers.Request, we can't just call the base class __init__
1309
1361
        # or we'll loop.
1310
 
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
 
1362
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
1311
1363
        self.follow_redirections = True
1312
1364
 
1313
1365
 
1440
1492
 
1441
1493
    _auth_header = 'Authorization'
1442
1494
    _password_prompt_prefix = ''
 
1495
    _username_prompt_prefix = ''
 
1496
    # Set by load_tests
 
1497
    _auth_server = None
1443
1498
 
1444
1499
    def setUp(self):
1445
1500
        super(TestAuth, self).setUp()
1448
1503
                                  ('b', 'contents of b\n'),])
1449
1504
 
1450
1505
    def create_transport_readonly_server(self):
1451
 
        if self._auth_scheme == 'basic':
1452
 
            server = http_utils.HTTPBasicAuthServer(
1453
 
                protocol_version=self._protocol_version)
1454
 
        else:
1455
 
            if self._auth_scheme != 'digest':
1456
 
                raise AssertionError('Unknown auth scheme: %r'
1457
 
                                     % self._auth_scheme)
1458
 
            server = http_utils.HTTPDigestAuthServer(
1459
 
                protocol_version=self._protocol_version)
1460
 
        return server
 
1506
        return self._auth_server(protocol_version=self._protocol_version)
1461
1507
 
1462
1508
    def _testing_pycurl(self):
1463
1509
        return pycurl_present and self._transport == PyCurlTransport
1514
1560
        # initial 'who are you' and 'this is not you, who are you')
1515
1561
        self.assertEqual(2, self.server.auth_required_errors)
1516
1562
 
 
1563
    def test_prompt_for_username(self):
 
1564
        if self._testing_pycurl():
 
1565
            raise tests.TestNotApplicable(
 
1566
                'pycurl cannot prompt, it handles auth by embedding'
 
1567
                ' user:pass in urls only')
 
1568
 
 
1569
        self.server.add_user('joe', 'foo')
 
1570
        t = self.get_user_transport(None, None)
 
1571
        stdout = tests.StringIOWrapper()
 
1572
        stderr = tests.StringIOWrapper()
 
1573
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
1574
                                            stdout=stdout, stderr=stderr)
 
1575
        self.assertEqual('contents of a\n',t.get('a').read())
 
1576
        # stdin should be empty
 
1577
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1578
        stderr.seek(0)
 
1579
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
 
1580
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
 
1581
        self.assertEquals('', stdout.getvalue())
 
1582
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1583
                                    stderr.readline())
 
1584
 
1517
1585
    def test_prompt_for_password(self):
1518
1586
        if self._testing_pycurl():
1519
1587
            raise tests.TestNotApplicable(
1523
1591
        self.server.add_user('joe', 'foo')
1524
1592
        t = self.get_user_transport('joe', None)
1525
1593
        stdout = tests.StringIOWrapper()
1526
 
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
1527
 
        self.assertEqual('contents of a\n',t.get('a').read())
 
1594
        stderr = tests.StringIOWrapper()
 
1595
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
 
1596
                                            stdout=stdout, stderr=stderr)
 
1597
        self.assertEqual('contents of a\n', t.get('a').read())
1528
1598
        # stdin should be empty
1529
1599
        self.assertEqual('', ui.ui_factory.stdin.readline())
1530
1600
        self._check_password_prompt(t._unqualified_scheme, 'joe',
1531
 
                                    stdout.getvalue())
 
1601
                                    stderr.getvalue())
 
1602
        self.assertEquals('', stdout.getvalue())
1532
1603
        # And we shouldn't prompt again for a different request
1533
1604
        # against the same transport.
1534
1605
        self.assertEqual('contents of b\n',t.get('b').read())
1546
1617
                                 self.server.auth_realm)))
1547
1618
        self.assertEquals(expected_prompt, actual_prompt)
1548
1619
 
 
1620
    def _expected_username_prompt(self, scheme):
 
1621
        return (self._username_prompt_prefix
 
1622
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
 
1623
                                 self.server.host, self.server.port,
 
1624
                                 self.server.auth_realm))
 
1625
 
1549
1626
    def test_no_prompt_for_password_when_using_auth_config(self):
1550
1627
        if self._testing_pycurl():
1551
1628
            raise tests.TestNotApplicable(
1592
1669
        self.assertEqual(1, self.server.auth_required_errors)
1593
1670
 
1594
1671
    def test_changing_nonce(self):
1595
 
        if self._auth_scheme != 'digest':
1596
 
            raise tests.TestNotApplicable('HTTP auth digest only test')
 
1672
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
 
1673
                                     http_utils.ProxyDigestAuthServer):
 
1674
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1597
1675
        if self._testing_pycurl():
1598
1676
            raise tests.KnownFailure(
1599
1677
                'pycurl does not handle a nonce change')
1617
1695
    """Test proxy authentication schemes."""
1618
1696
 
1619
1697
    _auth_header = 'Proxy-authorization'
1620
 
    _password_prompt_prefix='Proxy '
 
1698
    _password_prompt_prefix = 'Proxy '
 
1699
    _username_prompt_prefix = 'Proxy '
1621
1700
 
1622
1701
    def setUp(self):
1623
1702
        super(TestProxyAuth, self).setUp()
1630
1709
                                  ('b-proxied', 'contents of b\n'),
1631
1710
                                  ])
1632
1711
 
1633
 
    def create_transport_readonly_server(self):
1634
 
        if self._auth_scheme == 'basic':
1635
 
            server = http_utils.ProxyBasicAuthServer(
1636
 
                protocol_version=self._protocol_version)
1637
 
        else:
1638
 
            if self._auth_scheme != 'digest':
1639
 
                raise AssertionError('Unknown auth scheme: %r'
1640
 
                                     % self._auth_scheme)
1641
 
            server = http_utils.ProxyDigestAuthServer(
1642
 
                protocol_version=self._protocol_version)
1643
 
        return server
1644
 
 
1645
1712
    def get_user_transport(self, user, password):
1646
1713
        self._install_env({'all_proxy': self.get_user_url(user, password)})
1647
1714
        return self._transport(self.server.get_url())
1905
1972
 
1906
1973
        # We override at class level because constructors may propagate the
1907
1974
        # bound method and render instance overriding ineffective (an
1908
 
        # alternative would be be to define a specific ui factory instead...)
 
1975
        # alternative would be to define a specific ui factory instead...)
1909
1976
        self.orig_report_activity = self._transport._report_activity
1910
1977
        self._transport._report_activity = report_activity
1911
1978