/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Vincent Ladeuil
  • Date: 2007-12-21 12:20:33 UTC
  • mto: (3146.3.1 179368) (3156.2.1 trunk)
  • mto: This revision was merged to the branch mainline in revision 3158.
  • Revision ID: v.ladeuil+lp@free.fr-20071221122033-42bc21re0zj4kqbg
Merge back test_http_implementations.pc into test_http.py.

* bzrlib/tests/test_http.py: 
Merge test_http_implementations.py now that we have rewritten
load_tests. That should reduce the noise in the final proposed
patch.

* bzrlib/tests/http_server.py:
(TestingHTTPRequestHandler.log_message): Ghaaa, don't over spell-check.

Show diffs side-by-side

added added

removed removed

Lines of Context:
45
45
    http_server,
46
46
    http_utils,
47
47
    )
48
 
from bzrlib.tests.http_utils import (
49
 
    HTTPBasicAuthServer,
50
 
    HTTPDigestAuthServer,
51
 
    HTTPServerRedirecting,
52
 
    ProxyBasicAuthServer,
53
 
    ProxyDigestAuthServer,
54
 
    ProxyServer,
55
 
    TestCaseWithRedirectedWebserver,
56
 
    TestCaseWithTwoWebservers,
57
 
    TestCaseWithWebserver,
58
 
    )
 
48
 
59
49
from bzrlib.transport.http import (
60
50
    extract_auth,
61
 
    HttpTransportBase,
 
51
    _urllib,
62
52
    _urllib2_wrappers,
63
53
    )
64
 
from bzrlib.transport.http._urllib import HttpTransport_urllib
65
 
from bzrlib.transport.http._urllib2_wrappers import (
66
 
    ProxyHandler,
67
 
    Request,
68
 
    )
 
54
 
 
55
 
 
56
try:
 
57
    from bzrlib.transport.http._pycurl import PyCurlTransport
 
58
    pycurl_present = True
 
59
except errors.DependencyNotPresent:
 
60
    pycurl_present = False
 
61
 
 
62
 
 
63
class TransportAdapter(tests.TestScenarioApplier):
 
64
    """Generate the same test for each transport implementation."""
 
65
 
 
66
    def __init__(self):
 
67
        transport_scenarios = [
 
68
            ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
69
                            _server=http_server.HttpServer_urllib,
 
70
                            _qualified_prefix='http+urllib',)),
 
71
            ]
 
72
        if pycurl_present:
 
73
            transport_scenarios.append(
 
74
                ('pycurl', dict(_transport=PyCurlTransport,
 
75
                                _server=http_server.HttpServer_PyCurl,
 
76
                                _qualified_prefix='http+pycurl',)))
 
77
        self.scenarios = transport_scenarios
 
78
 
 
79
 
 
80
class TransportProtocolAdapter(TransportAdapter):
 
81
    """Generate the same test for each protocol implementation.
 
82
 
 
83
    In addition to the transport adaptatation that we inherit from.
 
84
    """
 
85
 
 
86
    def __init__(self):
 
87
        super(TransportProtocolAdapter, self).__init__()
 
88
        protocol_scenarios = [
 
89
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
90
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
91
            ]
 
92
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
93
                                                  protocol_scenarios)
 
94
 
 
95
 
 
96
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
 
97
    """Generate the same test for each authentication scheme implementation.
 
98
 
 
99
    In addition to the protocol adaptatation that we inherit from.
 
100
    """
 
101
 
 
102
    def __init__(self):
 
103
        super(TransportProtocolAuthenticationAdapter, self).__init__()
 
104
        auth_scheme_scenarios = [
 
105
            ('basic', dict(_auth_scheme='basic')),
 
106
            ('digest', dict(_auth_scheme='digest')),
 
107
            ]
 
108
 
 
109
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
110
                                                  auth_scheme_scenarios)
 
111
 
 
112
def load_tests(standard_tests, module, loader):
 
113
    """Multiply tests for http clients and protocol versions."""
 
114
    # one for each transport
 
115
    t_adapter = TransportAdapter()
 
116
    t_classes= (TestHttpTransportRegistration,
 
117
                TestHttpTransportUrls,
 
118
                )
 
119
    is_testing_for_transports = tests.condition_isinstance(t_classes)
 
120
 
 
121
    # multiplied by one for each protocol version
 
122
    tp_adapter = TransportProtocolAdapter()
 
123
    tp_classes= (TestDoCatchRedirections,
 
124
                 TestHTTPConnections,
 
125
                 TestHTTPRedirections,
 
126
                 TestHTTPSilentRedirections,
 
127
                 TestLimitedRangeRequestServer,
 
128
                 TestPost,
 
129
                 TestProxyHttpServer,
 
130
                 TestRanges,
 
131
                 TestSpecificRequestHandler,
 
132
                 )
 
133
    is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
 
134
 
 
135
    # multiplied by one for each authentication scheme
 
136
    tpa_adapter = TransportProtocolAuthenticationAdapter()
 
137
    tpa_classes = (TestAuth,
 
138
                   )
 
139
    is_also_testing_for_authentication = tests.condition_isinstance(tpa_classes)
 
140
 
 
141
    result = loader.suiteClass()
 
142
    for test in tests.iter_suite_tests(standard_tests):
 
143
        # Each test class is either standalone or testing for some combination
 
144
        # of transport, protocol version, authentication scheme. Use the right
 
145
        # adpater (or none) depending on the class.
 
146
        if is_testing_for_transports(test):
 
147
            result.addTests(t_adapter.adapt(test))
 
148
        elif is_also_testing_for_protocols(test):
 
149
            result.addTests(tp_adapter.adapt(test))
 
150
        elif is_also_testing_for_authentication(test):
 
151
            result.addTests(tpa_adapter.adapt(test))
 
152
        else:
 
153
            result.addTest(test)
 
154
    return result
69
155
 
70
156
 
71
157
class FakeManager(object):
183
269
                          f.credentials[0])
184
270
 
185
271
 
 
272
class TestHttpTransportUrls(tests.TestCase):
 
273
    """Test the http urls."""
 
274
 
 
275
    def test_abs_url(self):
 
276
        """Construction of absolute http URLs"""
 
277
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
278
        eq = self.assertEqualDiff
 
279
        eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
 
280
        eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
 
281
        eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
 
282
        eq(t.abspath('.bzr/1//2/./3'),
 
283
           'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
 
284
 
 
285
    def test_invalid_http_urls(self):
 
286
        """Trap invalid construction of urls"""
 
287
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
288
        self.assertRaises(errors.InvalidURL,
 
289
                          self._transport,
 
290
                          'http://http://bazaar-vcs.org/bzr/bzr.dev/')
 
291
 
 
292
    def test_http_root_urls(self):
 
293
        """Construction of URLs from server root"""
 
294
        t = self._transport('http://bzr.ozlabs.org/')
 
295
        eq = self.assertEqualDiff
 
296
        eq(t.abspath('.bzr/tree-version'),
 
297
           'http://bzr.ozlabs.org/.bzr/tree-version')
 
298
 
 
299
    def test_http_impl_urls(self):
 
300
        """There are servers which ask for particular clients to connect"""
 
301
        server = self._server()
 
302
        try:
 
303
            server.setUp()
 
304
            url = server.get_url()
 
305
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
 
306
        finally:
 
307
            server.tearDown()
 
308
 
 
309
 
186
310
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
187
311
 
188
312
    # TODO: This should really be moved into another pycurl
224
348
            # Restore the right function
225
349
            pycurl.version_info = version_info_orig
226
350
 
 
351
 
 
352
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
 
353
    """Test the http connections."""
 
354
 
 
355
    def setUp(self):
 
356
        http_utils.TestCaseWithWebserver.setUp(self)
 
357
        self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
 
358
                        transport=self.get_transport())
 
359
 
 
360
    def test_http_has(self):
 
361
        server = self.get_readonly_server()
 
362
        t = self._transport(server.get_url())
 
363
        self.assertEqual(t.has('foo/bar'), True)
 
364
        self.assertEqual(len(server.logs), 1)
 
365
        self.assertContainsRe(server.logs[0],
 
366
            r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
 
367
 
 
368
    def test_http_has_not_found(self):
 
369
        server = self.get_readonly_server()
 
370
        t = self._transport(server.get_url())
 
371
        self.assertEqual(t.has('not-found'), False)
 
372
        self.assertContainsRe(server.logs[1],
 
373
            r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
 
374
 
 
375
    def test_http_get(self):
 
376
        server = self.get_readonly_server()
 
377
        t = self._transport(server.get_url())
 
378
        fp = t.get('foo/bar')
 
379
        self.assertEqualDiff(
 
380
            fp.read(),
 
381
            'contents of foo/bar\n')
 
382
        self.assertEqual(len(server.logs), 1)
 
383
        self.assertTrue(server.logs[0].find(
 
384
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
 
385
            % bzrlib.__version__) > -1)
 
386
 
 
387
    def test_get_smart_medium(self):
 
388
        # For HTTP, get_smart_medium should return the transport object.
 
389
        server = self.get_readonly_server()
 
390
        http_transport = self._transport(server.get_url())
 
391
        medium = http_transport.get_smart_medium()
 
392
        self.assertIs(medium, http_transport)
 
393
 
 
394
    def test_has_on_bogus_host(self):
 
395
        # Get a free address and don't 'accept' on it, so that we
 
396
        # can be sure there is no http handler there, but set a
 
397
        # reasonable timeout to not slow down tests too much.
 
398
        default_timeout = socket.getdefaulttimeout()
 
399
        try:
 
400
            socket.setdefaulttimeout(2)
 
401
            s = socket.socket()
 
402
            s.bind(('localhost', 0))
 
403
            t = self._transport('http://%s:%s/' % s.getsockname())
 
404
            self.assertRaises(errors.ConnectionError, t.has, 'foo/bar')
 
405
        finally:
 
406
            socket.setdefaulttimeout(default_timeout)
 
407
 
 
408
 
 
409
class TestHttpTransportRegistration(tests.TestCase):
 
410
    """Test registrations of various http implementations"""
 
411
 
 
412
    def test_http_registered(self):
 
413
        t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
 
414
        self.assertIsInstance(t, transport.Transport)
 
415
        self.assertIsInstance(t, self._transport)
 
416
 
 
417
 
 
418
class TestPost(tests.TestCase):
 
419
 
 
420
    def test_post_body_is_received(self):
 
421
        server = http_utils.RecordingServer(expect_body_tail='end-of-body')
 
422
        server.setUp()
 
423
        self.addCleanup(server.tearDown)
 
424
        scheme = self._qualified_prefix
 
425
        url = '%s://%s:%s/' % (scheme, server.host, server.port)
 
426
        http_transport = self._transport(url)
 
427
        code, response = http_transport._post('abc def end-of-body')
 
428
        self.assertTrue(
 
429
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
 
430
        self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
 
431
        # The transport should not be assuming that the server can accept
 
432
        # chunked encoding the first time it connects, because HTTP/1.1, so we
 
433
        # check for the literal string.
 
434
        self.assertTrue(
 
435
            server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
 
436
 
 
437
 
227
438
class TestRangeHeader(tests.TestCase):
228
439
    """Test range_header method"""
229
440
 
252
463
                          tail=50)
253
464
 
254
465
 
 
466
class TestSpecificRequestHandler(http_utils.TestCaseWithWebserver):
 
467
    """Tests a specific request handler.
 
468
 
 
469
 
 
470
    Daughter class are expected to override _req_handler_class
 
471
    """
 
472
 
 
473
    # Provide a useful default
 
474
    _req_handler_class = http_server.TestingHTTPRequestHandler
 
475
 
 
476
    def create_transport_readonly_server(self):
 
477
        return http_server.HttpServer(self._req_handler_class,
 
478
                                      protocol_version=self._protocol_version)
 
479
 
 
480
 
 
481
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
 
482
    """Whatever request comes in, close the connection"""
 
483
 
 
484
    def handle_one_request(self):
 
485
        """Handle a single HTTP request, by abruptly closing the connection"""
 
486
        self.close_connection = 1
 
487
 
 
488
 
 
489
class TestWallServer(TestSpecificRequestHandler):
 
490
    """Tests exceptions during the connection phase"""
 
491
 
 
492
    _req_handler_class = WallRequestHandler
 
493
 
 
494
    def test_http_has(self):
 
495
        server = self.get_readonly_server()
 
496
        t = self._transport(server.get_url())
 
497
        # Unfortunately httplib (see HTTPResponse._read_status
 
498
        # for details) make no distinction between a closed
 
499
        # socket and badly formatted status line, so we can't
 
500
        # just test for ConnectionError, we have to test
 
501
        # InvalidHttpResponse too.
 
502
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
503
                          t.has, 'foo/bar')
 
504
 
 
505
    def test_http_get(self):
 
506
        server = self.get_readonly_server()
 
507
        t = self._transport(server.get_url())
 
508
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
509
                          t.get, 'foo/bar')
 
510
 
 
511
 
 
512
class BadStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
513
    """Whatever request comes in, returns a bad status"""
 
514
 
 
515
    def parse_request(self):
 
516
        """Fakes handling a single HTTP request, returns a bad status"""
 
517
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
518
        try:
 
519
            self.send_response(0, "Bad status")
 
520
            self.end_headers()
 
521
        except socket.error, e:
 
522
            # We don't want to pollute the test results with
 
523
            # spurious server errors while test succeed. In our
 
524
            # case, it may occur that the test has already read
 
525
            # the 'Bad Status' and closed the socket while we are
 
526
            # still trying to send some headers... So the test is
 
527
            # ok, but if we raise the exception, the output is
 
528
            # dirty. So we don't raise, but we close the
 
529
            # connection, just to be safe :)
 
530
            spurious = [errno.EPIPE,
 
531
                        errno.ECONNRESET,
 
532
                        errno.ECONNABORTED,
 
533
                        ]
 
534
            if (len(e.args) > 0) and (e.args[0] in spurious):
 
535
                self.close_connection = 1
 
536
                pass
 
537
            else:
 
538
                raise
 
539
        return False
 
540
 
 
541
 
 
542
class TestBadStatusServer(TestSpecificRequestHandler):
 
543
    """Tests bad status from server."""
 
544
 
 
545
    _req_handler_class = BadStatusRequestHandler
 
546
 
 
547
    def test_http_has(self):
 
548
        server = self.get_readonly_server()
 
549
        t = self._transport(server.get_url())
 
550
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
551
 
 
552
    def test_http_get(self):
 
553
        server = self.get_readonly_server()
 
554
        t = self._transport(server.get_url())
 
555
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
556
 
 
557
 
 
558
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
559
    """Whatever request comes in, returns am invalid status"""
 
560
 
 
561
    def parse_request(self):
 
562
        """Fakes handling a single HTTP request, returns a bad status"""
 
563
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
564
        self.wfile.write("Invalid status line\r\n")
 
565
        return False
 
566
 
 
567
 
 
568
class TestInvalidStatusServer(TestBadStatusServer):
 
569
    """Tests invalid status from server.
 
570
 
 
571
    Both implementations raises the same error as for a bad status.
 
572
    """
 
573
 
 
574
    _req_handler_class = InvalidStatusRequestHandler
 
575
 
 
576
 
 
577
class BadProtocolRequestHandler(http_server.TestingHTTPRequestHandler):
 
578
    """Whatever request comes in, returns a bad protocol version"""
 
579
 
 
580
    def parse_request(self):
 
581
        """Fakes handling a single HTTP request, returns a bad status"""
 
582
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
583
        # Returns an invalid protocol version, but curl just
 
584
        # ignores it and those cannot be tested.
 
585
        self.wfile.write("%s %d %s\r\n" % ('HTTP/0.0',
 
586
                                           404,
 
587
                                           'Look at my protocol version'))
 
588
        return False
 
589
 
 
590
 
 
591
class TestBadProtocolServer(TestSpecificRequestHandler):
 
592
    """Tests bad protocol from server."""
 
593
 
 
594
    _req_handler_class = BadProtocolRequestHandler
 
595
 
 
596
    def setUp(self):
 
597
        if pycurl_present and self._transport == PyCurlTransport:
 
598
            raise tests.TestNotApplicable(
 
599
                "pycurl doesn't check the protocol version")
 
600
        super(TestBadProtocolServer, self).setUp()
 
601
 
 
602
    def test_http_has(self):
 
603
        server = self.get_readonly_server()
 
604
        t = self._transport(server.get_url())
 
605
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
606
 
 
607
    def test_http_get(self):
 
608
        server = self.get_readonly_server()
 
609
        t = self._transport(server.get_url())
 
610
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
611
 
 
612
 
 
613
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
614
    """Whatever request comes in, returns a 403 code"""
 
615
 
 
616
    def parse_request(self):
 
617
        """Handle a single HTTP request, by replying we cannot handle it"""
 
618
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
619
        self.send_error(403)
 
620
        return False
 
621
 
 
622
 
 
623
class TestForbiddenServer(TestSpecificRequestHandler):
 
624
    """Tests forbidden server"""
 
625
 
 
626
    _req_handler_class = ForbiddenRequestHandler
 
627
 
 
628
    def test_http_has(self):
 
629
        server = self.get_readonly_server()
 
630
        t = self._transport(server.get_url())
 
631
        self.assertRaises(errors.TransportError, t.has, 'foo/bar')
 
632
 
 
633
    def test_http_get(self):
 
634
        server = self.get_readonly_server()
 
635
        t = self._transport(server.get_url())
 
636
        self.assertRaises(errors.TransportError, t.get, 'foo/bar')
 
637
 
 
638
 
255
639
class TestRecordingServer(tests.TestCase):
256
640
 
257
641
    def test_create(self):
283
667
        self.assertEqual('abc', server.received_bytes)
284
668
 
285
669
 
 
670
class TestRangeRequestServer(TestSpecificRequestHandler):
 
671
    """Tests readv requests against server.
 
672
 
 
673
    We test against default "normal" server.
 
674
    """
 
675
 
 
676
    def setUp(self):
 
677
        super(TestRangeRequestServer, self).setUp()
 
678
        self.build_tree_contents([('a', '0123456789')],)
 
679
 
 
680
    def test_readv(self):
 
681
        server = self.get_readonly_server()
 
682
        t = self._transport(server.get_url())
 
683
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
684
        self.assertEqual(l[0], (0, '0'))
 
685
        self.assertEqual(l[1], (1, '1'))
 
686
        self.assertEqual(l[2], (3, '34'))
 
687
        self.assertEqual(l[3], (9, '9'))
 
688
 
 
689
    def test_readv_out_of_order(self):
 
690
        server = self.get_readonly_server()
 
691
        t = self._transport(server.get_url())
 
692
        l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
693
        self.assertEqual(l[0], (1, '1'))
 
694
        self.assertEqual(l[1], (9, '9'))
 
695
        self.assertEqual(l[2], (0, '0'))
 
696
        self.assertEqual(l[3], (3, '34'))
 
697
 
 
698
    def test_readv_invalid_ranges(self):
 
699
        server = self.get_readonly_server()
 
700
        t = self._transport(server.get_url())
 
701
 
 
702
        # This is intentionally reading off the end of the file
 
703
        # since we are sure that it cannot get there
 
704
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
705
                              t.readv, 'a', [(1,1), (8,10)])
 
706
 
 
707
        # This is trying to seek past the end of the file, it should
 
708
        # also raise a special error
 
709
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
710
                              t.readv, 'a', [(12,2)])
 
711
 
 
712
    def test_readv_multiple_get_requests(self):
 
713
        server = self.get_readonly_server()
 
714
        t = self._transport(server.get_url())
 
715
        # force transport to issue multiple requests
 
716
        t._max_readv_combine = 1
 
717
        t._max_get_ranges = 1
 
718
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
719
        self.assertEqual(l[0], (0, '0'))
 
720
        self.assertEqual(l[1], (1, '1'))
 
721
        self.assertEqual(l[2], (3, '34'))
 
722
        self.assertEqual(l[3], (9, '9'))
 
723
        # The server should have issued 4 requests
 
724
        self.assertEqual(4, server.GET_request_nb)
 
725
 
 
726
    def test_readv_get_max_size(self):
 
727
        server = self.get_readonly_server()
 
728
        t = self._transport(server.get_url())
 
729
        # force transport to issue multiple requests by limiting the number of
 
730
        # bytes by request. Note that this apply to coalesced offsets only, a
 
731
        # single range ill keep its size even if bigger than the limit.
 
732
        t._get_max_size = 2
 
733
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
734
        self.assertEqual(l[0], (0, '0'))
 
735
        self.assertEqual(l[1], (1, '1'))
 
736
        self.assertEqual(l[2], (2, '2345'))
 
737
        self.assertEqual(l[3], (6, '6789'))
 
738
        # The server should have issued 3 requests
 
739
        self.assertEqual(3, server.GET_request_nb)
 
740
 
 
741
 
 
742
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
743
    """Always reply to range request as if they were single.
 
744
 
 
745
    Don't be explicit about it, just to annoy the clients.
 
746
    """
 
747
 
 
748
    def get_multiple_ranges(self, file, file_size, ranges):
 
749
        """Answer as if it was a single range request and ignores the rest"""
 
750
        (start, end) = ranges[0]
 
751
        return self.get_single_range(file, file_size, start, end)
 
752
 
 
753
 
 
754
class TestSingleRangeRequestServer(TestRangeRequestServer):
 
755
    """Test readv against a server which accept only single range requests"""
 
756
 
 
757
    _req_handler_class = SingleRangeRequestHandler
 
758
 
 
759
 
 
760
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
761
    """Only reply to simple range requests, errors out on multiple"""
 
762
 
 
763
    def get_multiple_ranges(self, file, file_size, ranges):
 
764
        """Refuses the multiple ranges request"""
 
765
        if len(ranges) > 1:
 
766
            file.close()
 
767
            self.send_error(416, "Requested range not satisfiable")
 
768
            return
 
769
        (start, end) = ranges[0]
 
770
        return self.get_single_range(file, file_size, start, end)
 
771
 
 
772
 
 
773
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
 
774
    """Test readv against a server which only accept single range requests"""
 
775
 
 
776
    _req_handler_class = SingleOnlyRangeRequestHandler
 
777
 
 
778
 
 
779
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
780
    """Ignore range requests without notice"""
 
781
 
 
782
    def do_GET(self):
 
783
        # Update the statistics
 
784
        self.server.test_case_server.GET_request_nb += 1
 
785
        # Just bypass the range handling done by TestingHTTPRequestHandler
 
786
        return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
 
787
 
 
788
 
 
789
class TestNoRangeRequestServer(TestRangeRequestServer):
 
790
    """Test readv against a server which do not accept range requests"""
 
791
 
 
792
    _req_handler_class = NoRangeRequestHandler
 
793
 
 
794
 
 
795
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
796
    """Errors out when range specifiers exceed the limit"""
 
797
 
 
798
    def get_multiple_ranges(self, file, file_size, ranges):
 
799
        """Refuses the multiple ranges request"""
 
800
        tcs = self.server.test_case_server
 
801
        if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
 
802
            file.close()
 
803
            # Emulate apache behavior
 
804
            self.send_error(400, "Bad Request")
 
805
            return
 
806
        return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
 
807
            self, file, file_size, ranges)
 
808
 
 
809
 
 
810
class LimitedRangeHTTPServer(http_server.HttpServer):
 
811
    """An HttpServer erroring out on requests with too much range specifiers"""
 
812
 
 
813
    def __init__(self, request_handler=LimitedRangeRequestHandler,
 
814
                 protocol_version=None,
 
815
                 range_limit=None):
 
816
        http_server.HttpServer.__init__(self, request_handler,
 
817
                                        protocol_version=protocol_version)
 
818
        self.range_limit = range_limit
 
819
 
 
820
 
 
821
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
 
822
    """Tests readv requests against a server erroring out on too much ranges."""
 
823
 
 
824
    range_limit = 3
 
825
 
 
826
    def create_transport_readonly_server(self):
 
827
        # Requests with more range specifiers will error out
 
828
        return LimitedRangeHTTPServer(range_limit=self.range_limit,
 
829
                                      protocol_version=self._protocol_version)
 
830
 
 
831
    def get_transport(self):
 
832
        return self._transport(self.get_readonly_server().get_url())
 
833
 
 
834
    def setUp(self):
 
835
        http_utils.TestCaseWithWebserver.setUp(self)
 
836
        # We need to manipulate ranges that correspond to real chunks in the
 
837
        # response, so we build a content appropriately.
 
838
        filler = ''.join(['abcdefghij' for x in range(102)])
 
839
        content = ''.join(['%04d' % v + filler for v in range(16)])
 
840
        self.build_tree_contents([('a', content)],)
 
841
 
 
842
    def test_few_ranges(self):
 
843
        t = self.get_transport()
 
844
        l = list(t.readv('a', ((0, 4), (1024, 4), )))
 
845
        self.assertEqual(l[0], (0, '0000'))
 
846
        self.assertEqual(l[1], (1024, '0001'))
 
847
        self.assertEqual(1, self.get_readonly_server().GET_request_nb)
 
848
 
 
849
    def test_more_ranges(self):
 
850
        t = self.get_transport()
 
851
        l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
 
852
        self.assertEqual(l[0], (0, '0000'))
 
853
        self.assertEqual(l[1], (1024, '0001'))
 
854
        self.assertEqual(l[2], (4096, '0004'))
 
855
        self.assertEqual(l[3], (8192, '0008'))
 
856
        # The server will refuse to serve the first request (too much ranges),
 
857
        # a second request will succeeds.
 
858
        self.assertEqual(2, self.get_readonly_server().GET_request_nb)
 
859
 
 
860
 
286
861
class TestHttpProxyWhiteBox(tests.TestCase):
287
862
    """Whitebox test proxy http authorization.
288
863
 
321
896
        self.assertRaises(errors.InvalidURL, self._proxied_request)
322
897
 
323
898
 
 
899
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
 
900
    """Tests proxy server.
 
901
 
 
902
    Be aware that we do not setup a real proxy here. Instead, we
 
903
    check that the *connection* goes through the proxy by serving
 
904
    different content (the faked proxy server append '-proxied'
 
905
    to the file names).
 
906
    """
 
907
 
 
908
    # FIXME: We don't have an https server available, so we don't
 
909
    # test https connections.
 
910
 
 
911
    def setUp(self):
 
912
        super(TestProxyHttpServer, self).setUp()
 
913
        self.build_tree_contents([('foo', 'contents of foo\n'),
 
914
                                  ('foo-proxied', 'proxied contents of foo\n')])
 
915
        # Let's setup some attributes for tests
 
916
        self.server = self.get_readonly_server()
 
917
        self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
 
918
        if self._testing_pycurl():
 
919
            # Oh my ! pycurl does not check for the port as part of
 
920
            # no_proxy :-( So we just test the host part
 
921
            self.no_proxy_host = 'localhost'
 
922
        else:
 
923
            self.no_proxy_host = self.proxy_address
 
924
        # The secondary server is the proxy
 
925
        self.proxy = self.get_secondary_server()
 
926
        self.proxy_url = self.proxy.get_url()
 
927
        self._old_env = {}
 
928
 
 
929
    def _testing_pycurl(self):
 
930
        return pycurl_present and self._transport == PyCurlTransport
 
931
 
 
932
    def create_transport_secondary_server(self):
 
933
        """Creates an http server that will serve files with
 
934
        '-proxied' appended to their names.
 
935
        """
 
936
        return http_utils.ProxyServer(protocol_version=self._protocol_version)
 
937
 
 
938
    def _install_env(self, env):
 
939
        for name, value in env.iteritems():
 
940
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
941
 
 
942
    def _restore_env(self):
 
943
        for name, value in self._old_env.iteritems():
 
944
            osutils.set_or_unset_env(name, value)
 
945
 
 
946
    def proxied_in_env(self, env):
 
947
        self._install_env(env)
 
948
        url = self.server.get_url()
 
949
        t = self._transport(url)
 
950
        try:
 
951
            self.assertEqual(t.get('foo').read(), 'proxied contents of foo\n')
 
952
        finally:
 
953
            self._restore_env()
 
954
 
 
955
    def not_proxied_in_env(self, env):
 
956
        self._install_env(env)
 
957
        url = self.server.get_url()
 
958
        t = self._transport(url)
 
959
        try:
 
960
            self.assertEqual(t.get('foo').read(), 'contents of foo\n')
 
961
        finally:
 
962
            self._restore_env()
 
963
 
 
964
    def test_http_proxy(self):
 
965
        self.proxied_in_env({'http_proxy': self.proxy_url})
 
966
 
 
967
    def test_HTTP_PROXY(self):
 
968
        if self._testing_pycurl():
 
969
            # pycurl does not check HTTP_PROXY for security reasons
 
970
            # (for use in a CGI context that we do not care
 
971
            # about. Should we ?)
 
972
            raise tests.TestNotApplicable(
 
973
                'pycurl does not check HTTP_PROXY for security reasons')
 
974
        self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
 
975
 
 
976
    def test_all_proxy(self):
 
977
        self.proxied_in_env({'all_proxy': self.proxy_url})
 
978
 
 
979
    def test_ALL_PROXY(self):
 
980
        self.proxied_in_env({'ALL_PROXY': self.proxy_url})
 
981
 
 
982
    def test_http_proxy_with_no_proxy(self):
 
983
        self.not_proxied_in_env({'http_proxy': self.proxy_url,
 
984
                                 'no_proxy': self.no_proxy_host})
 
985
 
 
986
    def test_HTTP_PROXY_with_NO_PROXY(self):
 
987
        if self._testing_pycurl():
 
988
            raise tests.TestNotApplicable(
 
989
                'pycurl does not check HTTP_PROXY for security reasons')
 
990
        self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
 
991
                                 'NO_PROXY': self.no_proxy_host})
 
992
 
 
993
    def test_all_proxy_with_no_proxy(self):
 
994
        self.not_proxied_in_env({'all_proxy': self.proxy_url,
 
995
                                 'no_proxy': self.no_proxy_host})
 
996
 
 
997
    def test_ALL_PROXY_with_NO_PROXY(self):
 
998
        self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
 
999
                                 'NO_PROXY': self.no_proxy_host})
 
1000
 
 
1001
    def test_http_proxy_without_scheme(self):
 
1002
        if self._testing_pycurl():
 
1003
            # pycurl *ignores* invalid proxy env variables. If that ever change
 
1004
            # in the future, this test will fail indicating that pycurl do not
 
1005
            # ignore anymore such variables.
 
1006
            self.not_proxied_in_env({'http_proxy': self.proxy_address})
 
1007
        else:
 
1008
            self.assertRaises(errors.InvalidURL,
 
1009
                              self.proxied_in_env,
 
1010
                              {'http_proxy': self.proxy_address})
 
1011
 
 
1012
 
 
1013
class TestRanges(http_utils.TestCaseWithWebserver):
 
1014
    """Test the Range header in GET methods."""
 
1015
 
 
1016
    def setUp(self):
 
1017
        http_utils.TestCaseWithWebserver.setUp(self)
 
1018
        self.build_tree_contents([('a', '0123456789')],)
 
1019
        server = self.get_readonly_server()
 
1020
        self.transport = self._transport(server.get_url())
 
1021
 
 
1022
    def _file_contents(self, relpath, ranges):
 
1023
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
1024
        coalesce = self.transport._coalesce_offsets
 
1025
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
1026
        code, data = self.transport._get(relpath, coalesced)
 
1027
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1028
        for start, end in ranges:
 
1029
            data.seek(start)
 
1030
            yield data.read(end - start + 1)
 
1031
 
 
1032
    def _file_tail(self, relpath, tail_amount):
 
1033
        code, data = self.transport._get(relpath, [], tail_amount)
 
1034
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1035
        data.seek(-tail_amount, 2)
 
1036
        return data.read(tail_amount)
 
1037
 
 
1038
    def test_range_header(self):
 
1039
        # Valid ranges
 
1040
        map(self.assertEqual,['0', '234'],
 
1041
            list(self._file_contents('a', [(0,0), (2,4)])),)
 
1042
 
 
1043
    def test_range_header_tail(self):
 
1044
        self.assertEqual('789', self._file_tail('a', 3))
 
1045
 
 
1046
    def test_syntactically_invalid_range_header(self):
 
1047
        self.assertListRaises(errors.InvalidHttpRange,
 
1048
                          self._file_contents, 'a', [(4, 3)])
 
1049
 
 
1050
    def test_semantically_invalid_range_header(self):
 
1051
        self.assertListRaises(errors.InvalidHttpRange,
 
1052
                          self._file_contents, 'a', [(42, 128)])
 
1053
 
 
1054
 
 
1055
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1056
    """Test redirection between http servers."""
 
1057
 
 
1058
    def create_transport_secondary_server(self):
 
1059
        """Create the secondary server redirecting to the primary server"""
 
1060
        new = self.get_readonly_server()
 
1061
 
 
1062
        redirecting = http_utils.HTTPServerRedirecting(
 
1063
            protocol_version=self._protocol_version)
 
1064
        redirecting.redirect_to(new.host, new.port)
 
1065
        return redirecting
 
1066
 
 
1067
    def setUp(self):
 
1068
        super(TestHTTPRedirections, self).setUp()
 
1069
        self.build_tree_contents([('a', '0123456789'),
 
1070
                                  ('bundle',
 
1071
                                  '# Bazaar revision bundle v0.9\n#\n')
 
1072
                                  ],)
 
1073
 
 
1074
        self.old_transport = self._transport(self.old_server.get_url())
 
1075
 
 
1076
    def test_redirected(self):
 
1077
        self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
 
1078
        t = self._transport(self.new_server.get_url())
 
1079
        self.assertEqual('0123456789', t.get('a').read())
 
1080
 
 
1081
    def test_read_redirected_bundle_from_url(self):
 
1082
        from bzrlib.bundle import read_bundle_from_url
 
1083
        url = self.old_transport.abspath('bundle')
 
1084
        bundle = read_bundle_from_url(url)
 
1085
        # If read_bundle_from_url was successful we get an empty bundle
 
1086
        self.assertEqual([], bundle.revisions)
 
1087
 
 
1088
 
 
1089
class RedirectedRequest(_urllib2_wrappers.Request):
 
1090
    """Request following redirections. """
 
1091
 
 
1092
    init_orig = _urllib2_wrappers.Request.__init__
 
1093
 
 
1094
    def __init__(self, method, url, *args, **kwargs):
 
1095
        """Constructor.
 
1096
 
 
1097
        """
 
1098
        # Since the tests using this class will replace
 
1099
        # _urllib2_wrappers.Request, we can't just call the base class __init__
 
1100
        # or we'll loop.
 
1101
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
 
1102
        self.follow_redirections = True
 
1103
 
 
1104
 
 
1105
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1106
    """Test redirections.
 
1107
 
 
1108
    http implementations do not redirect silently anymore (they
 
1109
    do not redirect at all in fact). The mechanism is still in
 
1110
    place at the _urllib2_wrappers.Request level and these tests
 
1111
    exercise it.
 
1112
 
 
1113
    For the pycurl implementation
 
1114
    the redirection have been deleted as we may deprecate pycurl
 
1115
    and I have no place to keep a working implementation.
 
1116
    -- vila 20070212
 
1117
    """
 
1118
 
 
1119
    def setUp(self):
 
1120
        if pycurl_present and self._transport == PyCurlTransport:
 
1121
            raise tests.TestNotApplicable(
 
1122
                "pycurl doesn't redirect silently annymore")
 
1123
        super(TestHTTPSilentRedirections, self).setUp()
 
1124
        self.setup_redirected_request()
 
1125
        self.addCleanup(self.cleanup_redirected_request)
 
1126
        self.build_tree_contents([('a','a'),
 
1127
                                  ('1/',),
 
1128
                                  ('1/a', 'redirected once'),
 
1129
                                  ('2/',),
 
1130
                                  ('2/a', 'redirected twice'),
 
1131
                                  ('3/',),
 
1132
                                  ('3/a', 'redirected thrice'),
 
1133
                                  ('4/',),
 
1134
                                  ('4/a', 'redirected 4 times'),
 
1135
                                  ('5/',),
 
1136
                                  ('5/a', 'redirected 5 times'),
 
1137
                                  ],)
 
1138
 
 
1139
        self.old_transport = self._transport(self.old_server.get_url())
 
1140
 
 
1141
    def setup_redirected_request(self):
 
1142
        self.original_class = _urllib2_wrappers.Request
 
1143
        _urllib2_wrappers.Request = RedirectedRequest
 
1144
 
 
1145
    def cleanup_redirected_request(self):
 
1146
        _urllib2_wrappers.Request = self.original_class
 
1147
 
 
1148
    def create_transport_secondary_server(self):
 
1149
        """Create the secondary server, redirections are defined in the tests"""
 
1150
        return http_utils.HTTPServerRedirecting(
 
1151
            protocol_version=self._protocol_version)
 
1152
 
 
1153
    def test_one_redirection(self):
 
1154
        t = self.old_transport
 
1155
 
 
1156
        req = RedirectedRequest('GET', t.abspath('a'))
 
1157
        req.follow_redirections = True
 
1158
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1159
                                       self.new_server.port)
 
1160
        self.old_server.redirections = \
 
1161
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
1162
        self.assertEquals('redirected once',t._perform(req).read())
 
1163
 
 
1164
    def test_five_redirections(self):
 
1165
        t = self.old_transport
 
1166
 
 
1167
        req = RedirectedRequest('GET', t.abspath('a'))
 
1168
        req.follow_redirections = True
 
1169
        old_prefix = 'http://%s:%s' % (self.old_server.host,
 
1170
                                       self.old_server.port)
 
1171
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1172
                                       self.new_server.port)
 
1173
        self.old_server.redirections = \
 
1174
            [('/1(.*)', r'%s/2\1' % (old_prefix), 302),
 
1175
             ('/2(.*)', r'%s/3\1' % (old_prefix), 303),
 
1176
             ('/3(.*)', r'%s/4\1' % (old_prefix), 307),
 
1177
             ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
 
1178
             ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
 
1179
             ]
 
1180
        self.assertEquals('redirected 5 times',t._perform(req).read())
 
1181
 
 
1182
 
 
1183
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1184
    """Test transport.do_catching_redirections."""
 
1185
 
 
1186
    def setUp(self):
 
1187
        super(TestDoCatchRedirections, self).setUp()
 
1188
        self.build_tree_contents([('a', '0123456789'),],)
 
1189
 
 
1190
        self.old_transport = self._transport(self.old_server.get_url())
 
1191
 
 
1192
    def get_a(self, transport):
 
1193
        return transport.get('a')
 
1194
 
 
1195
    def test_no_redirection(self):
 
1196
        t = self._transport(self.new_server.get_url())
 
1197
 
 
1198
        # We use None for redirected so that we fail if redirected
 
1199
        self.assertEquals('0123456789',
 
1200
                          transport.do_catching_redirections(
 
1201
                self.get_a, t, None).read())
 
1202
 
 
1203
    def test_one_redirection(self):
 
1204
        self.redirections = 0
 
1205
 
 
1206
        def redirected(transport, exception, redirection_notice):
 
1207
            self.redirections += 1
 
1208
            dir, file = urlutils.split(exception.target)
 
1209
            return self._transport(dir)
 
1210
 
 
1211
        self.assertEquals('0123456789',
 
1212
                          transport.do_catching_redirections(
 
1213
                self.get_a, self.old_transport, redirected).read())
 
1214
        self.assertEquals(1, self.redirections)
 
1215
 
 
1216
    def test_redirection_loop(self):
 
1217
 
 
1218
        def redirected(transport, exception, redirection_notice):
 
1219
            # By using the redirected url as a base dir for the
 
1220
            # *old* transport, we create a loop: a => a/a =>
 
1221
            # a/a/a
 
1222
            return self.old_transport.clone(exception.target)
 
1223
 
 
1224
        self.assertRaises(errors.TooManyRedirections,
 
1225
                          transport.do_catching_redirections,
 
1226
                          self.get_a, self.old_transport, redirected)
 
1227
 
 
1228
 
 
1229
class TestAuth(http_utils.TestCaseWithWebserver):
 
1230
    """Test authentication scheme"""
 
1231
 
 
1232
    _auth_header = 'Authorization'
 
1233
    _password_prompt_prefix = ''
 
1234
 
 
1235
    def setUp(self):
 
1236
        super(TestAuth, self).setUp()
 
1237
        self.server = self.get_readonly_server()
 
1238
        self.build_tree_contents([('a', 'contents of a\n'),
 
1239
                                  ('b', 'contents of b\n'),])
 
1240
 
 
1241
    def create_transport_readonly_server(self):
 
1242
        if self._auth_scheme == 'basic':
 
1243
            server = http_utils.HTTPBasicAuthServer(
 
1244
                protocol_version=self._protocol_version)
 
1245
        else:
 
1246
            if self._auth_scheme != 'digest':
 
1247
                raise AssertionError('Unknown auth scheme: %r'
 
1248
                                     % self._auth_scheme)
 
1249
            server = http_utils.HTTPDigestAuthServer(
 
1250
                protocol_version=self._protocol_version)
 
1251
        return server
 
1252
 
 
1253
    def _testing_pycurl(self):
 
1254
        return pycurl_present and self._transport == PyCurlTransport
 
1255
 
 
1256
    def get_user_url(self, user=None, password=None):
 
1257
        """Build an url embedding user and password"""
 
1258
        url = '%s://' % self.server._url_protocol
 
1259
        if user is not None:
 
1260
            url += user
 
1261
            if password is not None:
 
1262
                url += ':' + password
 
1263
            url += '@'
 
1264
        url += '%s:%s/' % (self.server.host, self.server.port)
 
1265
        return url
 
1266
 
 
1267
    def get_user_transport(self, user=None, password=None):
 
1268
        return self._transport(self.get_user_url(user, password))
 
1269
 
 
1270
    def test_no_user(self):
 
1271
        self.server.add_user('joe', 'foo')
 
1272
        t = self.get_user_transport()
 
1273
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1274
        # Only one 'Authentication Required' error should occur
 
1275
        self.assertEqual(1, self.server.auth_required_errors)
 
1276
 
 
1277
    def test_empty_pass(self):
 
1278
        self.server.add_user('joe', '')
 
1279
        t = self.get_user_transport('joe', '')
 
1280
        self.assertEqual('contents of a\n', t.get('a').read())
 
1281
        # Only one 'Authentication Required' error should occur
 
1282
        self.assertEqual(1, self.server.auth_required_errors)
 
1283
 
 
1284
    def test_user_pass(self):
 
1285
        self.server.add_user('joe', 'foo')
 
1286
        t = self.get_user_transport('joe', 'foo')
 
1287
        self.assertEqual('contents of a\n', t.get('a').read())
 
1288
        # Only one 'Authentication Required' error should occur
 
1289
        self.assertEqual(1, self.server.auth_required_errors)
 
1290
 
 
1291
    def test_unknown_user(self):
 
1292
        self.server.add_user('joe', 'foo')
 
1293
        t = self.get_user_transport('bill', 'foo')
 
1294
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1295
        # Two 'Authentication Required' errors should occur (the
 
1296
        # initial 'who are you' and 'I don't know you, who are
 
1297
        # you').
 
1298
        self.assertEqual(2, self.server.auth_required_errors)
 
1299
 
 
1300
    def test_wrong_pass(self):
 
1301
        self.server.add_user('joe', 'foo')
 
1302
        t = self.get_user_transport('joe', 'bar')
 
1303
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1304
        # Two 'Authentication Required' errors should occur (the
 
1305
        # initial 'who are you' and 'this is not you, who are you')
 
1306
        self.assertEqual(2, self.server.auth_required_errors)
 
1307
 
 
1308
    def test_prompt_for_password(self):
 
1309
        if self._testing_pycurl():
 
1310
            raise tests.TestNotApplicable(
 
1311
                'pycurl cannot prompt, it handles auth by embedding'
 
1312
                ' user:pass in urls only')
 
1313
 
 
1314
        self.server.add_user('joe', 'foo')
 
1315
        t = self.get_user_transport('joe', None)
 
1316
        stdout = tests.StringIOWrapper()
 
1317
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1318
        self.assertEqual('contents of a\n',t.get('a').read())
 
1319
        # stdin should be empty
 
1320
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1321
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1322
                                    stdout.getvalue())
 
1323
        # And we shouldn't prompt again for a different request
 
1324
        # against the same transport.
 
1325
        self.assertEqual('contents of b\n',t.get('b').read())
 
1326
        t2 = t.clone()
 
1327
        # And neither against a clone
 
1328
        self.assertEqual('contents of b\n',t2.get('b').read())
 
1329
        # Only one 'Authentication Required' error should occur
 
1330
        self.assertEqual(1, self.server.auth_required_errors)
 
1331
 
 
1332
    def _check_password_prompt(self, scheme, user, actual_prompt):
 
1333
        expected_prompt = (self._password_prompt_prefix
 
1334
                           + ("%s %s@%s:%d, Realm: '%s' password: "
 
1335
                              % (scheme.upper(),
 
1336
                                 user, self.server.host, self.server.port,
 
1337
                                 self.server.auth_realm)))
 
1338
        self.assertEquals(expected_prompt, actual_prompt)
 
1339
 
 
1340
    def test_no_prompt_for_password_when_using_auth_config(self):
 
1341
        if self._testing_pycurl():
 
1342
            raise tests.TestNotApplicable(
 
1343
                'pycurl does not support authentication.conf'
 
1344
                ' since it cannot prompt')
 
1345
 
 
1346
        user =' joe'
 
1347
        password = 'foo'
 
1348
        stdin_content = 'bar\n'  # Not the right password
 
1349
        self.server.add_user(user, password)
 
1350
        t = self.get_user_transport(user, None)
 
1351
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
 
1352
                                            stdout=tests.StringIOWrapper())
 
1353
        # Create a minimal config file with the right password
 
1354
        conf = config.AuthenticationConfig()
 
1355
        conf._get_config().update(
 
1356
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1357
                          'user': user, 'password': password}})
 
1358
        conf._save()
 
1359
        # Issue a request to the server to connect
 
1360
        self.assertEqual('contents of a\n',t.get('a').read())
 
1361
        # stdin should have  been left untouched
 
1362
        self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
 
1363
        # Only one 'Authentication Required' error should occur
 
1364
        self.assertEqual(1, self.server.auth_required_errors)
 
1365
 
 
1366
 
 
1367
 
 
1368
class TestProxyAuth(TestAuth):
 
1369
    """Test proxy authentication schemes."""
 
1370
 
 
1371
    _auth_header = 'Proxy-authorization'
 
1372
    _password_prompt_prefix='Proxy '
 
1373
 
 
1374
    def setUp(self):
 
1375
        super(TestProxyAuth, self).setUp()
 
1376
        self._old_env = {}
 
1377
        self.addCleanup(self._restore_env)
 
1378
        # Override the contents to avoid false positives
 
1379
        self.build_tree_contents([('a', 'not proxied contents of a\n'),
 
1380
                                  ('b', 'not proxied contents of b\n'),
 
1381
                                  ('a-proxied', 'contents of a\n'),
 
1382
                                  ('b-proxied', 'contents of b\n'),
 
1383
                                  ])
 
1384
 
 
1385
    def create_transport_readonly_server(self):
 
1386
        if self._auth_scheme == 'basic':
 
1387
            server = http_utils.ProxyBasicAuthServer(
 
1388
                protocol_version=self._protocol_version)
 
1389
        else:
 
1390
            if self._auth_scheme != 'digest':
 
1391
                raise AssertionError('Unknown auth scheme: %r'
 
1392
                                     % self._auth_scheme)
 
1393
            server = http_utils.ProxyDigestAuthServer(
 
1394
                protocol_version=self._protocol_version)
 
1395
        return server
 
1396
 
 
1397
    def get_user_transport(self, user=None, password=None):
 
1398
        self._install_env({'all_proxy': self.get_user_url(user, password)})
 
1399
        return self._transport(self.server.get_url())
 
1400
 
 
1401
    def _install_env(self, env):
 
1402
        for name, value in env.iteritems():
 
1403
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1404
 
 
1405
    def _restore_env(self):
 
1406
        for name, value in self._old_env.iteritems():
 
1407
            osutils.set_or_unset_env(name, value)
 
1408
 
 
1409
    def test_empty_pass(self):
 
1410
        if self._testing_pycurl():
 
1411
            import pycurl
 
1412
            if pycurl.version_info()[1] < '7.16.0':
 
1413
                raise tests.KnownFailure(
 
1414
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
 
1415
        super(TestProxyAuth, self).test_empty_pass()
 
1416