/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Andrew Bennetts
  • Date: 2008-12-16 02:58:31 UTC
  • mto: This revision was merged to the branch mainline in revision 3910.
  • Revision ID: andrew.bennetts@canonical.com-20081216025831-vgwlxfoz7n9b8fyh
Skip test for two formats, and fix format 5 by avoiding a full history sync with non-format5 branches.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for HTTP implementations.
 
18
 
 
19
This module defines a load_tests() method that parametrize tests classes for
 
20
transport implementation, http protocol versions and authentication schemes.
 
21
"""
 
22
 
 
23
# TODO: Should be renamed to bzrlib.transport.http.tests?
 
24
# TODO: What about renaming to bzrlib.tests.transport.http ?
 
25
 
 
26
from cStringIO import StringIO
 
27
import httplib
 
28
import os
 
29
import select
 
30
import SimpleHTTPServer
 
31
import socket
 
32
import sys
 
33
import threading
 
34
 
 
35
import bzrlib
 
36
from bzrlib import (
 
37
    bzrdir,
 
38
    config,
 
39
    errors,
 
40
    osutils,
 
41
    remote as _mod_remote,
 
42
    tests,
 
43
    transport,
 
44
    ui,
 
45
    urlutils,
 
46
    )
 
47
from bzrlib.tests import (
 
48
    http_server,
 
49
    http_utils,
 
50
    )
 
51
from bzrlib.transport import (
 
52
    http,
 
53
    remote,
 
54
    )
 
55
from bzrlib.transport.http import (
 
56
    _urllib,
 
57
    _urllib2_wrappers,
 
58
    )
 
59
 
 
60
 
 
61
try:
 
62
    from bzrlib.transport.http._pycurl import PyCurlTransport
 
63
    pycurl_present = True
 
64
except errors.DependencyNotPresent:
 
65
    pycurl_present = False
 
66
 
 
67
 
 
68
class TransportAdapter(tests.TestScenarioApplier):
 
69
    """Generate the same test for each transport implementation."""
 
70
 
 
71
    def __init__(self):
 
72
        transport_scenarios = [
 
73
            ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
74
                            _server=http_server.HttpServer_urllib,
 
75
                            _qualified_prefix='http+urllib',)),
 
76
            ]
 
77
        if pycurl_present:
 
78
            transport_scenarios.append(
 
79
                ('pycurl', dict(_transport=PyCurlTransport,
 
80
                                _server=http_server.HttpServer_PyCurl,
 
81
                                _qualified_prefix='http+pycurl',)))
 
82
        self.scenarios = transport_scenarios
 
83
 
 
84
 
 
85
class TransportProtocolAdapter(TransportAdapter):
 
86
    """Generate the same test for each protocol implementation.
 
87
 
 
88
    In addition to the transport adaptatation that we inherit from.
 
89
    """
 
90
 
 
91
    def __init__(self):
 
92
        super(TransportProtocolAdapter, self).__init__()
 
93
        protocol_scenarios = [
 
94
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
95
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
96
            ]
 
97
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
98
                                                  protocol_scenarios)
 
99
 
 
100
 
 
101
class TransportProtocolAuthenticationAdapter(TransportProtocolAdapter):
 
102
    """Generate the same test for each authentication scheme implementation.
 
103
 
 
104
    In addition to the protocol adaptatation that we inherit from.
 
105
    """
 
106
 
 
107
    def __init__(self):
 
108
        super(TransportProtocolAuthenticationAdapter, self).__init__()
 
109
        auth_scheme_scenarios = [
 
110
            ('basic', dict(_auth_scheme='basic')),
 
111
            ('digest', dict(_auth_scheme='digest')),
 
112
            ]
 
113
 
 
114
        self.scenarios = tests.multiply_scenarios(self.scenarios,
 
115
                                                  auth_scheme_scenarios)
 
116
 
 
117
def load_tests(standard_tests, module, loader):
 
118
    """Multiply tests for http clients and protocol versions."""
 
119
    # one for each transport
 
120
    t_adapter = TransportAdapter()
 
121
    t_classes= (TestHttpTransportRegistration,
 
122
                TestHttpTransportUrls,
 
123
                Test_redirected_to,
 
124
                )
 
125
    is_testing_for_transports = tests.condition_isinstance(t_classes)
 
126
 
 
127
    # multiplied by one for each protocol version
 
128
    tp_adapter = TransportProtocolAdapter()
 
129
    tp_classes= (SmartHTTPTunnellingTest,
 
130
                 TestDoCatchRedirections,
 
131
                 TestHTTPConnections,
 
132
                 TestHTTPRedirections,
 
133
                 TestHTTPSilentRedirections,
 
134
                 TestLimitedRangeRequestServer,
 
135
                 TestPost,
 
136
                 TestProxyHttpServer,
 
137
                 TestRanges,
 
138
                 TestSpecificRequestHandler,
 
139
                 )
 
140
    is_also_testing_for_protocols = tests.condition_isinstance(tp_classes)
 
141
 
 
142
    # multiplied by one for each authentication scheme
 
143
    tpa_adapter = TransportProtocolAuthenticationAdapter()
 
144
    tpa_classes = (TestAuth,
 
145
                   )
 
146
    is_also_testing_for_authentication = tests.condition_isinstance(
 
147
        tpa_classes)
 
148
 
 
149
    result = loader.suiteClass()
 
150
    for test_class in tests.iter_suite_tests(standard_tests):
 
151
        # Each test class is either standalone or testing for some combination
 
152
        # of transport, protocol version, authentication scheme. Use the right
 
153
        # adpater (or none) depending on the class.
 
154
        if is_testing_for_transports(test_class):
 
155
            result.addTests(t_adapter.adapt(test_class))
 
156
        elif is_also_testing_for_protocols(test_class):
 
157
            result.addTests(tp_adapter.adapt(test_class))
 
158
        elif is_also_testing_for_authentication(test_class):
 
159
            result.addTests(tpa_adapter.adapt(test_class))
 
160
        else:
 
161
            result.addTest(test_class)
 
162
    return result
 
163
 
 
164
 
 
165
class FakeManager(object):
 
166
 
 
167
    def __init__(self):
 
168
        self.credentials = []
 
169
 
 
170
    def add_password(self, realm, host, username, password):
 
171
        self.credentials.append([realm, host, username, password])
 
172
 
 
173
 
 
174
class RecordingServer(object):
 
175
    """A fake HTTP server.
 
176
    
 
177
    It records the bytes sent to it, and replies with a 200.
 
178
    """
 
179
 
 
180
    def __init__(self, expect_body_tail=None):
 
181
        """Constructor.
 
182
 
 
183
        :type expect_body_tail: str
 
184
        :param expect_body_tail: a reply won't be sent until this string is
 
185
            received.
 
186
        """
 
187
        self._expect_body_tail = expect_body_tail
 
188
        self.host = None
 
189
        self.port = None
 
190
        self.received_bytes = ''
 
191
 
 
192
    def setUp(self):
 
193
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
194
        self._sock.bind(('127.0.0.1', 0))
 
195
        self.host, self.port = self._sock.getsockname()
 
196
        self._ready = threading.Event()
 
197
        self._thread = threading.Thread(target=self._accept_read_and_reply)
 
198
        self._thread.setDaemon(True)
 
199
        self._thread.start()
 
200
        self._ready.wait(5)
 
201
 
 
202
    def _accept_read_and_reply(self):
 
203
        self._sock.listen(1)
 
204
        self._ready.set()
 
205
        self._sock.settimeout(5)
 
206
        try:
 
207
            conn, address = self._sock.accept()
 
208
            # On win32, the accepted connection will be non-blocking to start
 
209
            # with because we're using settimeout.
 
210
            conn.setblocking(True)
 
211
            while not self.received_bytes.endswith(self._expect_body_tail):
 
212
                self.received_bytes += conn.recv(4096)
 
213
            conn.sendall('HTTP/1.1 200 OK\r\n')
 
214
        except socket.timeout:
 
215
            # Make sure the client isn't stuck waiting for us to e.g. accept.
 
216
            self._sock.close()
 
217
        except socket.error:
 
218
            # The client may have already closed the socket.
 
219
            pass
 
220
 
 
221
    def tearDown(self):
 
222
        try:
 
223
            self._sock.close()
 
224
        except socket.error:
 
225
            # We might have already closed it.  We don't care.
 
226
            pass
 
227
        self.host = None
 
228
        self.port = None
 
229
 
 
230
 
 
231
class TestHTTPServer(tests.TestCase):
 
232
    """Test the HTTP servers implementations."""
 
233
 
 
234
    def test_invalid_protocol(self):
 
235
        class BogusRequestHandler(http_server.TestingHTTPRequestHandler):
 
236
 
 
237
            protocol_version = 'HTTP/0.1'
 
238
 
 
239
        server = http_server.HttpServer(BogusRequestHandler)
 
240
        try:
 
241
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
 
242
        except:
 
243
            server.tearDown()
 
244
            self.fail('HTTP Server creation did not raise UnknownProtocol')
 
245
 
 
246
    def test_force_invalid_protocol(self):
 
247
        server = http_server.HttpServer(protocol_version='HTTP/0.1')
 
248
        try:
 
249
            self.assertRaises(httplib.UnknownProtocol,server.setUp)
 
250
        except:
 
251
            server.tearDown()
 
252
            self.fail('HTTP Server creation did not raise UnknownProtocol')
 
253
 
 
254
    def test_server_start_and_stop(self):
 
255
        server = http_server.HttpServer()
 
256
        server.setUp()
 
257
        self.assertTrue(server._http_running)
 
258
        server.tearDown()
 
259
        self.assertFalse(server._http_running)
 
260
 
 
261
    def test_create_http_server_one_zero(self):
 
262
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
 
263
 
 
264
            protocol_version = 'HTTP/1.0'
 
265
 
 
266
        server = http_server.HttpServer(RequestHandlerOneZero)
 
267
        server.setUp()
 
268
        self.addCleanup(server.tearDown)
 
269
        self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
 
270
 
 
271
    def test_create_http_server_one_one(self):
 
272
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
 
273
 
 
274
            protocol_version = 'HTTP/1.1'
 
275
 
 
276
        server = http_server.HttpServer(RequestHandlerOneOne)
 
277
        server.setUp()
 
278
        self.addCleanup(server.tearDown)
 
279
        self.assertIsInstance(server._httpd,
 
280
                              http_server.TestingThreadingHTTPServer)
 
281
 
 
282
    def test_create_http_server_force_one_one(self):
 
283
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
 
284
 
 
285
            protocol_version = 'HTTP/1.0'
 
286
 
 
287
        server = http_server.HttpServer(RequestHandlerOneZero,
 
288
                                        protocol_version='HTTP/1.1')
 
289
        server.setUp()
 
290
        self.addCleanup(server.tearDown)
 
291
        self.assertIsInstance(server._httpd,
 
292
                              http_server.TestingThreadingHTTPServer)
 
293
 
 
294
    def test_create_http_server_force_one_zero(self):
 
295
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
 
296
 
 
297
            protocol_version = 'HTTP/1.1'
 
298
 
 
299
        server = http_server.HttpServer(RequestHandlerOneOne,
 
300
                                        protocol_version='HTTP/1.0')
 
301
        server.setUp()
 
302
        self.addCleanup(server.tearDown)
 
303
        self.assertIsInstance(server._httpd,
 
304
                              http_server.TestingHTTPServer)
 
305
 
 
306
 
 
307
class TestWithTransport_pycurl(object):
 
308
    """Test case to inherit from if pycurl is present"""
 
309
 
 
310
    def _get_pycurl_maybe(self):
 
311
        try:
 
312
            from bzrlib.transport.http._pycurl import PyCurlTransport
 
313
            return PyCurlTransport
 
314
        except errors.DependencyNotPresent:
 
315
            raise tests.TestSkipped('pycurl not present')
 
316
 
 
317
    _transport = property(_get_pycurl_maybe)
 
318
 
 
319
 
 
320
class TestHttpUrls(tests.TestCase):
 
321
 
 
322
    # TODO: This should be moved to authorization tests once they
 
323
    # are written.
 
324
 
 
325
    def test_url_parsing(self):
 
326
        f = FakeManager()
 
327
        url = http.extract_auth('http://example.com', f)
 
328
        self.assertEquals('http://example.com', url)
 
329
        self.assertEquals(0, len(f.credentials))
 
330
        url = http.extract_auth(
 
331
            'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
 
332
        self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
 
333
        self.assertEquals(1, len(f.credentials))
 
334
        self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
 
335
                          f.credentials[0])
 
336
 
 
337
 
 
338
class TestHttpTransportUrls(tests.TestCase):
 
339
    """Test the http urls."""
 
340
 
 
341
    def test_abs_url(self):
 
342
        """Construction of absolute http URLs"""
 
343
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
344
        eq = self.assertEqualDiff
 
345
        eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
 
346
        eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
 
347
        eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
 
348
        eq(t.abspath('.bzr/1//2/./3'),
 
349
           'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
 
350
 
 
351
    def test_invalid_http_urls(self):
 
352
        """Trap invalid construction of urls"""
 
353
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
354
        self.assertRaises(errors.InvalidURL,
 
355
                          self._transport,
 
356
                          'http://http://bazaar-vcs.org/bzr/bzr.dev/')
 
357
 
 
358
    def test_http_root_urls(self):
 
359
        """Construction of URLs from server root"""
 
360
        t = self._transport('http://bzr.ozlabs.org/')
 
361
        eq = self.assertEqualDiff
 
362
        eq(t.abspath('.bzr/tree-version'),
 
363
           'http://bzr.ozlabs.org/.bzr/tree-version')
 
364
 
 
365
    def test_http_impl_urls(self):
 
366
        """There are servers which ask for particular clients to connect"""
 
367
        server = self._server()
 
368
        try:
 
369
            server.setUp()
 
370
            url = server.get_url()
 
371
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
 
372
        finally:
 
373
            server.tearDown()
 
374
 
 
375
 
 
376
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
 
377
 
 
378
    # TODO: This should really be moved into another pycurl
 
379
    # specific test. When https tests will be implemented, take
 
380
    # this one into account.
 
381
    def test_pycurl_without_https_support(self):
 
382
        """Test that pycurl without SSL do not fail with a traceback.
 
383
 
 
384
        For the purpose of the test, we force pycurl to ignore
 
385
        https by supplying a fake version_info that do not
 
386
        support it.
 
387
        """
 
388
        try:
 
389
            import pycurl
 
390
        except ImportError:
 
391
            raise tests.TestSkipped('pycurl not present')
 
392
 
 
393
        version_info_orig = pycurl.version_info
 
394
        try:
 
395
            # Now that we have pycurl imported, we can fake its version_info
 
396
            # This was taken from a windows pycurl without SSL
 
397
            # (thanks to bialix)
 
398
            pycurl.version_info = lambda : (2,
 
399
                                            '7.13.2',
 
400
                                            462082,
 
401
                                            'i386-pc-win32',
 
402
                                            2576,
 
403
                                            None,
 
404
                                            0,
 
405
                                            None,
 
406
                                            ('ftp', 'gopher', 'telnet',
 
407
                                             'dict', 'ldap', 'http', 'file'),
 
408
                                            None,
 
409
                                            0,
 
410
                                            None)
 
411
            self.assertRaises(errors.DependencyNotPresent, self._transport,
 
412
                              'https://launchpad.net')
 
413
        finally:
 
414
            # Restore the right function
 
415
            pycurl.version_info = version_info_orig
 
416
 
 
417
 
 
418
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
 
419
    """Test the http connections."""
 
420
 
 
421
    def setUp(self):
 
422
        http_utils.TestCaseWithWebserver.setUp(self)
 
423
        self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
 
424
                        transport=self.get_transport())
 
425
 
 
426
    def test_http_has(self):
 
427
        server = self.get_readonly_server()
 
428
        t = self._transport(server.get_url())
 
429
        self.assertEqual(t.has('foo/bar'), True)
 
430
        self.assertEqual(len(server.logs), 1)
 
431
        self.assertContainsRe(server.logs[0],
 
432
            r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
 
433
 
 
434
    def test_http_has_not_found(self):
 
435
        server = self.get_readonly_server()
 
436
        t = self._transport(server.get_url())
 
437
        self.assertEqual(t.has('not-found'), False)
 
438
        self.assertContainsRe(server.logs[1],
 
439
            r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
 
440
 
 
441
    def test_http_get(self):
 
442
        server = self.get_readonly_server()
 
443
        t = self._transport(server.get_url())
 
444
        fp = t.get('foo/bar')
 
445
        self.assertEqualDiff(
 
446
            fp.read(),
 
447
            'contents of foo/bar\n')
 
448
        self.assertEqual(len(server.logs), 1)
 
449
        self.assertTrue(server.logs[0].find(
 
450
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
 
451
            % bzrlib.__version__) > -1)
 
452
 
 
453
    def test_has_on_bogus_host(self):
 
454
        # Get a free address and don't 'accept' on it, so that we
 
455
        # can be sure there is no http handler there, but set a
 
456
        # reasonable timeout to not slow down tests too much.
 
457
        default_timeout = socket.getdefaulttimeout()
 
458
        try:
 
459
            socket.setdefaulttimeout(2)
 
460
            s = socket.socket()
 
461
            s.bind(('localhost', 0))
 
462
            t = self._transport('http://%s:%s/' % s.getsockname())
 
463
            self.assertRaises(errors.ConnectionError, t.has, 'foo/bar')
 
464
        finally:
 
465
            socket.setdefaulttimeout(default_timeout)
 
466
 
 
467
 
 
468
class TestHttpTransportRegistration(tests.TestCase):
 
469
    """Test registrations of various http implementations"""
 
470
 
 
471
    def test_http_registered(self):
 
472
        t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
 
473
        self.assertIsInstance(t, transport.Transport)
 
474
        self.assertIsInstance(t, self._transport)
 
475
 
 
476
 
 
477
class TestPost(tests.TestCase):
 
478
 
 
479
    def test_post_body_is_received(self):
 
480
        server = RecordingServer(expect_body_tail='end-of-body')
 
481
        server.setUp()
 
482
        self.addCleanup(server.tearDown)
 
483
        scheme = self._qualified_prefix
 
484
        url = '%s://%s:%s/' % (scheme, server.host, server.port)
 
485
        http_transport = self._transport(url)
 
486
        code, response = http_transport._post('abc def end-of-body')
 
487
        self.assertTrue(
 
488
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
 
489
        self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
 
490
        # The transport should not be assuming that the server can accept
 
491
        # chunked encoding the first time it connects, because HTTP/1.1, so we
 
492
        # check for the literal string.
 
493
        self.assertTrue(
 
494
            server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
 
495
 
 
496
 
 
497
class TestRangeHeader(tests.TestCase):
 
498
    """Test range_header method"""
 
499
 
 
500
    def check_header(self, value, ranges=[], tail=0):
 
501
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
502
        coalesce = transport.Transport._coalesce_offsets
 
503
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
504
        range_header = http.HttpTransportBase._range_header
 
505
        self.assertEqual(value, range_header(coalesced, tail))
 
506
 
 
507
    def test_range_header_single(self):
 
508
        self.check_header('0-9', ranges=[(0,9)])
 
509
        self.check_header('100-109', ranges=[(100,109)])
 
510
 
 
511
    def test_range_header_tail(self):
 
512
        self.check_header('-10', tail=10)
 
513
        self.check_header('-50', tail=50)
 
514
 
 
515
    def test_range_header_multi(self):
 
516
        self.check_header('0-9,100-200,300-5000',
 
517
                          ranges=[(0,9), (100, 200), (300,5000)])
 
518
 
 
519
    def test_range_header_mixed(self):
 
520
        self.check_header('0-9,300-5000,-50',
 
521
                          ranges=[(0,9), (300,5000)],
 
522
                          tail=50)
 
523
 
 
524
 
 
525
class TestSpecificRequestHandler(http_utils.TestCaseWithWebserver):
 
526
    """Tests a specific request handler.
 
527
 
 
528
    Daughter classes are expected to override _req_handler_class
 
529
    """
 
530
 
 
531
    # Provide a useful default
 
532
    _req_handler_class = http_server.TestingHTTPRequestHandler
 
533
 
 
534
    def create_transport_readonly_server(self):
 
535
        return http_server.HttpServer(self._req_handler_class,
 
536
                                      protocol_version=self._protocol_version)
 
537
 
 
538
    def _testing_pycurl(self):
 
539
        return pycurl_present and self._transport == PyCurlTransport
 
540
 
 
541
 
 
542
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
 
543
    """Whatever request comes in, close the connection"""
 
544
 
 
545
    def handle_one_request(self):
 
546
        """Handle a single HTTP request, by abruptly closing the connection"""
 
547
        self.close_connection = 1
 
548
 
 
549
 
 
550
class TestWallServer(TestSpecificRequestHandler):
 
551
    """Tests exceptions during the connection phase"""
 
552
 
 
553
    _req_handler_class = WallRequestHandler
 
554
 
 
555
    def test_http_has(self):
 
556
        server = self.get_readonly_server()
 
557
        t = self._transport(server.get_url())
 
558
        # Unfortunately httplib (see HTTPResponse._read_status
 
559
        # for details) make no distinction between a closed
 
560
        # socket and badly formatted status line, so we can't
 
561
        # just test for ConnectionError, we have to test
 
562
        # InvalidHttpResponse too.
 
563
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
564
                          t.has, 'foo/bar')
 
565
 
 
566
    def test_http_get(self):
 
567
        server = self.get_readonly_server()
 
568
        t = self._transport(server.get_url())
 
569
        self.assertRaises((errors.ConnectionError, errors.InvalidHttpResponse),
 
570
                          t.get, 'foo/bar')
 
571
 
 
572
 
 
573
class BadStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
574
    """Whatever request comes in, returns a bad status"""
 
575
 
 
576
    def parse_request(self):
 
577
        """Fakes handling a single HTTP request, returns a bad status"""
 
578
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
579
        self.send_response(0, "Bad status")
 
580
        self.close_connection = 1
 
581
        return False
 
582
 
 
583
 
 
584
class TestBadStatusServer(TestSpecificRequestHandler):
 
585
    """Tests bad status from server."""
 
586
 
 
587
    _req_handler_class = BadStatusRequestHandler
 
588
 
 
589
    def test_http_has(self):
 
590
        server = self.get_readonly_server()
 
591
        t = self._transport(server.get_url())
 
592
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
593
 
 
594
    def test_http_get(self):
 
595
        server = self.get_readonly_server()
 
596
        t = self._transport(server.get_url())
 
597
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
598
 
 
599
 
 
600
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
601
    """Whatever request comes in, returns an invalid status"""
 
602
 
 
603
    def parse_request(self):
 
604
        """Fakes handling a single HTTP request, returns a bad status"""
 
605
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
606
        self.wfile.write("Invalid status line\r\n")
 
607
        return False
 
608
 
 
609
 
 
610
class TestInvalidStatusServer(TestBadStatusServer):
 
611
    """Tests invalid status from server.
 
612
 
 
613
    Both implementations raises the same error as for a bad status.
 
614
    """
 
615
 
 
616
    _req_handler_class = InvalidStatusRequestHandler
 
617
 
 
618
    def test_http_has(self):
 
619
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
 
620
            raise tests.KnownFailure(
 
621
                'pycurl hangs if the server send back garbage')
 
622
        super(TestInvalidStatusServer, self).test_http_has()
 
623
 
 
624
    def test_http_get(self):
 
625
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
 
626
            raise tests.KnownFailure(
 
627
                'pycurl hangs if the server send back garbage')
 
628
        super(TestInvalidStatusServer, self).test_http_get()
 
629
 
 
630
 
 
631
class BadProtocolRequestHandler(http_server.TestingHTTPRequestHandler):
 
632
    """Whatever request comes in, returns a bad protocol version"""
 
633
 
 
634
    def parse_request(self):
 
635
        """Fakes handling a single HTTP request, returns a bad status"""
 
636
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
637
        # Returns an invalid protocol version, but curl just
 
638
        # ignores it and those cannot be tested.
 
639
        self.wfile.write("%s %d %s\r\n" % ('HTTP/0.0',
 
640
                                           404,
 
641
                                           'Look at my protocol version'))
 
642
        return False
 
643
 
 
644
 
 
645
class TestBadProtocolServer(TestSpecificRequestHandler):
 
646
    """Tests bad protocol from server."""
 
647
 
 
648
    _req_handler_class = BadProtocolRequestHandler
 
649
 
 
650
    def setUp(self):
 
651
        if pycurl_present and self._transport == PyCurlTransport:
 
652
            raise tests.TestNotApplicable(
 
653
                "pycurl doesn't check the protocol version")
 
654
        super(TestBadProtocolServer, self).setUp()
 
655
 
 
656
    def test_http_has(self):
 
657
        server = self.get_readonly_server()
 
658
        t = self._transport(server.get_url())
 
659
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
660
 
 
661
    def test_http_get(self):
 
662
        server = self.get_readonly_server()
 
663
        t = self._transport(server.get_url())
 
664
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
665
 
 
666
 
 
667
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
668
    """Whatever request comes in, returns a 403 code"""
 
669
 
 
670
    def parse_request(self):
 
671
        """Handle a single HTTP request, by replying we cannot handle it"""
 
672
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
673
        self.send_error(403)
 
674
        return False
 
675
 
 
676
 
 
677
class TestForbiddenServer(TestSpecificRequestHandler):
 
678
    """Tests forbidden server"""
 
679
 
 
680
    _req_handler_class = ForbiddenRequestHandler
 
681
 
 
682
    def test_http_has(self):
 
683
        server = self.get_readonly_server()
 
684
        t = self._transport(server.get_url())
 
685
        self.assertRaises(errors.TransportError, t.has, 'foo/bar')
 
686
 
 
687
    def test_http_get(self):
 
688
        server = self.get_readonly_server()
 
689
        t = self._transport(server.get_url())
 
690
        self.assertRaises(errors.TransportError, t.get, 'foo/bar')
 
691
 
 
692
 
 
693
class TestRecordingServer(tests.TestCase):
 
694
 
 
695
    def test_create(self):
 
696
        server = RecordingServer(expect_body_tail=None)
 
697
        self.assertEqual('', server.received_bytes)
 
698
        self.assertEqual(None, server.host)
 
699
        self.assertEqual(None, server.port)
 
700
 
 
701
    def test_setUp_and_tearDown(self):
 
702
        server = RecordingServer(expect_body_tail=None)
 
703
        server.setUp()
 
704
        try:
 
705
            self.assertNotEqual(None, server.host)
 
706
            self.assertNotEqual(None, server.port)
 
707
        finally:
 
708
            server.tearDown()
 
709
        self.assertEqual(None, server.host)
 
710
        self.assertEqual(None, server.port)
 
711
 
 
712
    def test_send_receive_bytes(self):
 
713
        server = RecordingServer(expect_body_tail='c')
 
714
        server.setUp()
 
715
        self.addCleanup(server.tearDown)
 
716
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
717
        sock.connect((server.host, server.port))
 
718
        sock.sendall('abc')
 
719
        self.assertEqual('HTTP/1.1 200 OK\r\n',
 
720
                         osutils.recv_all(sock, 4096))
 
721
        self.assertEqual('abc', server.received_bytes)
 
722
 
 
723
 
 
724
class TestRangeRequestServer(TestSpecificRequestHandler):
 
725
    """Tests readv requests against server.
 
726
 
 
727
    We test against default "normal" server.
 
728
    """
 
729
 
 
730
    def setUp(self):
 
731
        super(TestRangeRequestServer, self).setUp()
 
732
        self.build_tree_contents([('a', '0123456789')],)
 
733
 
 
734
    def test_readv(self):
 
735
        server = self.get_readonly_server()
 
736
        t = self._transport(server.get_url())
 
737
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
738
        self.assertEqual(l[0], (0, '0'))
 
739
        self.assertEqual(l[1], (1, '1'))
 
740
        self.assertEqual(l[2], (3, '34'))
 
741
        self.assertEqual(l[3], (9, '9'))
 
742
 
 
743
    def test_readv_out_of_order(self):
 
744
        server = self.get_readonly_server()
 
745
        t = self._transport(server.get_url())
 
746
        l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
747
        self.assertEqual(l[0], (1, '1'))
 
748
        self.assertEqual(l[1], (9, '9'))
 
749
        self.assertEqual(l[2], (0, '0'))
 
750
        self.assertEqual(l[3], (3, '34'))
 
751
 
 
752
    def test_readv_invalid_ranges(self):
 
753
        server = self.get_readonly_server()
 
754
        t = self._transport(server.get_url())
 
755
 
 
756
        # This is intentionally reading off the end of the file
 
757
        # since we are sure that it cannot get there
 
758
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
759
                              t.readv, 'a', [(1,1), (8,10)])
 
760
 
 
761
        # This is trying to seek past the end of the file, it should
 
762
        # also raise a special error
 
763
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
764
                              t.readv, 'a', [(12,2)])
 
765
 
 
766
    def test_readv_multiple_get_requests(self):
 
767
        server = self.get_readonly_server()
 
768
        t = self._transport(server.get_url())
 
769
        # force transport to issue multiple requests
 
770
        t._max_readv_combine = 1
 
771
        t._max_get_ranges = 1
 
772
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
773
        self.assertEqual(l[0], (0, '0'))
 
774
        self.assertEqual(l[1], (1, '1'))
 
775
        self.assertEqual(l[2], (3, '34'))
 
776
        self.assertEqual(l[3], (9, '9'))
 
777
        # The server should have issued 4 requests
 
778
        self.assertEqual(4, server.GET_request_nb)
 
779
 
 
780
    def test_readv_get_max_size(self):
 
781
        server = self.get_readonly_server()
 
782
        t = self._transport(server.get_url())
 
783
        # force transport to issue multiple requests by limiting the number of
 
784
        # bytes by request. Note that this apply to coalesced offsets only, a
 
785
        # single range will keep its size even if bigger than the limit.
 
786
        t._get_max_size = 2
 
787
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
788
        self.assertEqual(l[0], (0, '0'))
 
789
        self.assertEqual(l[1], (1, '1'))
 
790
        self.assertEqual(l[2], (2, '2345'))
 
791
        self.assertEqual(l[3], (6, '6789'))
 
792
        # The server should have issued 3 requests
 
793
        self.assertEqual(3, server.GET_request_nb)
 
794
 
 
795
    def test_complete_readv_leave_pipe_clean(self):
 
796
        server = self.get_readonly_server()
 
797
        t = self._transport(server.get_url())
 
798
        # force transport to issue multiple requests
 
799
        t._get_max_size = 2
 
800
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
801
        # The server should have issued 3 requests
 
802
        self.assertEqual(3, server.GET_request_nb)
 
803
        self.assertEqual('0123456789', t.get_bytes('a'))
 
804
        self.assertEqual(4, server.GET_request_nb)
 
805
 
 
806
    def test_incomplete_readv_leave_pipe_clean(self):
 
807
        server = self.get_readonly_server()
 
808
        t = self._transport(server.get_url())
 
809
        # force transport to issue multiple requests
 
810
        t._get_max_size = 2
 
811
        # Don't collapse readv results into a list so that we leave unread
 
812
        # bytes on the socket
 
813
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
814
        self.assertEqual((0, '0'), ireadv.next())
 
815
        # The server should have issued one request so far 
 
816
        self.assertEqual(1, server.GET_request_nb)
 
817
        self.assertEqual('0123456789', t.get_bytes('a'))
 
818
        # get_bytes issued an additional request, the readv pending ones are
 
819
        # lost
 
820
        self.assertEqual(2, server.GET_request_nb)
 
821
 
 
822
 
 
823
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
824
    """Always reply to range request as if they were single.
 
825
 
 
826
    Don't be explicit about it, just to annoy the clients.
 
827
    """
 
828
 
 
829
    def get_multiple_ranges(self, file, file_size, ranges):
 
830
        """Answer as if it was a single range request and ignores the rest"""
 
831
        (start, end) = ranges[0]
 
832
        return self.get_single_range(file, file_size, start, end)
 
833
 
 
834
 
 
835
class TestSingleRangeRequestServer(TestRangeRequestServer):
 
836
    """Test readv against a server which accept only single range requests"""
 
837
 
 
838
    _req_handler_class = SingleRangeRequestHandler
 
839
 
 
840
 
 
841
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
842
    """Only reply to simple range requests, errors out on multiple"""
 
843
 
 
844
    def get_multiple_ranges(self, file, file_size, ranges):
 
845
        """Refuses the multiple ranges request"""
 
846
        if len(ranges) > 1:
 
847
            file.close()
 
848
            self.send_error(416, "Requested range not satisfiable")
 
849
            return
 
850
        (start, end) = ranges[0]
 
851
        return self.get_single_range(file, file_size, start, end)
 
852
 
 
853
 
 
854
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
 
855
    """Test readv against a server which only accept single range requests"""
 
856
 
 
857
    _req_handler_class = SingleOnlyRangeRequestHandler
 
858
 
 
859
 
 
860
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
861
    """Ignore range requests without notice"""
 
862
 
 
863
    def do_GET(self):
 
864
        # Update the statistics
 
865
        self.server.test_case_server.GET_request_nb += 1
 
866
        # Just bypass the range handling done by TestingHTTPRequestHandler
 
867
        return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
 
868
 
 
869
 
 
870
class TestNoRangeRequestServer(TestRangeRequestServer):
 
871
    """Test readv against a server which do not accept range requests"""
 
872
 
 
873
    _req_handler_class = NoRangeRequestHandler
 
874
 
 
875
 
 
876
class MultipleRangeWithoutContentLengthRequestHandler(
 
877
    http_server.TestingHTTPRequestHandler):
 
878
    """Reply to multiple range requests without content length header."""
 
879
 
 
880
    def get_multiple_ranges(self, file, file_size, ranges):
 
881
        self.send_response(206)
 
882
        self.send_header('Accept-Ranges', 'bytes')
 
883
        boundary = "%d" % random.randint(0,0x7FFFFFFF)
 
884
        self.send_header("Content-Type",
 
885
                         "multipart/byteranges; boundary=%s" % boundary)
 
886
        self.end_headers()
 
887
        for (start, end) in ranges:
 
888
            self.wfile.write("--%s\r\n" % boundary)
 
889
            self.send_header("Content-type", 'application/octet-stream')
 
890
            self.send_header("Content-Range", "bytes %d-%d/%d" % (start,
 
891
                                                                  end,
 
892
                                                                  file_size))
 
893
            self.end_headers()
 
894
            self.send_range_content(file, start, end - start + 1)
 
895
        # Final boundary
 
896
        self.wfile.write("--%s\r\n" % boundary)
 
897
 
 
898
 
 
899
class TestMultipleRangeWithoutContentLengthServer(TestRangeRequestServer):
 
900
 
 
901
    _req_handler_class = MultipleRangeWithoutContentLengthRequestHandler
 
902
 
 
903
 
 
904
class TruncatedMultipleRangeRequestHandler(
 
905
    http_server.TestingHTTPRequestHandler):
 
906
    """Reply to multiple range requests truncating the last ones.
 
907
 
 
908
    This server generates responses whose Content-Length describes all the
 
909
    ranges, but fail to include the last ones leading to client short reads.
 
910
    This has been observed randomly with lighttpd (bug #179368).
 
911
    """
 
912
 
 
913
    _truncated_ranges = 2
 
914
 
 
915
    def get_multiple_ranges(self, file, file_size, ranges):
 
916
        self.send_response(206)
 
917
        self.send_header('Accept-Ranges', 'bytes')
 
918
        boundary = 'tagada'
 
919
        self.send_header('Content-Type',
 
920
                         'multipart/byteranges; boundary=%s' % boundary)
 
921
        boundary_line = '--%s\r\n' % boundary
 
922
        # Calculate the Content-Length
 
923
        content_length = 0
 
924
        for (start, end) in ranges:
 
925
            content_length += len(boundary_line)
 
926
            content_length += self._header_line_length(
 
927
                'Content-type', 'application/octet-stream')
 
928
            content_length += self._header_line_length(
 
929
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
 
930
            content_length += len('\r\n') # end headers
 
931
            content_length += end - start # + 1
 
932
        content_length += len(boundary_line)
 
933
        self.send_header('Content-length', content_length)
 
934
        self.end_headers()
 
935
 
 
936
        # Send the multipart body
 
937
        cur = 0
 
938
        for (start, end) in ranges:
 
939
            self.wfile.write(boundary_line)
 
940
            self.send_header('Content-type', 'application/octet-stream')
 
941
            self.send_header('Content-Range', 'bytes %d-%d/%d'
 
942
                             % (start, end, file_size))
 
943
            self.end_headers()
 
944
            if cur + self._truncated_ranges >= len(ranges):
 
945
                # Abruptly ends the response and close the connection
 
946
                self.close_connection = 1
 
947
                return
 
948
            self.send_range_content(file, start, end - start + 1)
 
949
            cur += 1
 
950
        # No final boundary
 
951
        self.wfile.write(boundary_line)
 
952
 
 
953
 
 
954
class TestTruncatedMultipleRangeServer(TestSpecificRequestHandler):
 
955
 
 
956
    _req_handler_class = TruncatedMultipleRangeRequestHandler
 
957
 
 
958
    def setUp(self):
 
959
        super(TestTruncatedMultipleRangeServer, self).setUp()
 
960
        self.build_tree_contents([('a', '0123456789')],)
 
961
 
 
962
    def test_readv_with_short_reads(self):
 
963
        server = self.get_readonly_server()
 
964
        t = self._transport(server.get_url())
 
965
        # Force separate ranges for each offset
 
966
        t._bytes_to_read_before_seek = 0
 
967
        ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
 
968
        self.assertEqual((0, '0'), ireadv.next())
 
969
        self.assertEqual((2, '2'), ireadv.next())
 
970
        if not self._testing_pycurl():
 
971
            # Only one request have been issued so far (except for pycurl that
 
972
            # try to read the whole response at once)
 
973
            self.assertEqual(1, server.GET_request_nb)
 
974
        self.assertEqual((4, '45'), ireadv.next())
 
975
        self.assertEqual((9, '9'), ireadv.next())
 
976
        # Both implementations issue 3 requests but:
 
977
        # - urllib does two multiple (4 ranges, then 2 ranges) then a single
 
978
        #   range,
 
979
        # - pycurl does two multiple (4 ranges, 4 ranges) then a single range
 
980
        self.assertEqual(3, server.GET_request_nb)
 
981
        # Finally the client have tried a single range request and stays in
 
982
        # that mode
 
983
        self.assertEqual('single', t._range_hint)
 
984
 
 
985
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
986
    """Errors out when range specifiers exceed the limit"""
 
987
 
 
988
    def get_multiple_ranges(self, file, file_size, ranges):
 
989
        """Refuses the multiple ranges request"""
 
990
        tcs = self.server.test_case_server
 
991
        if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
 
992
            file.close()
 
993
            # Emulate apache behavior
 
994
            self.send_error(400, "Bad Request")
 
995
            return
 
996
        return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
 
997
            self, file, file_size, ranges)
 
998
 
 
999
 
 
1000
class LimitedRangeHTTPServer(http_server.HttpServer):
 
1001
    """An HttpServer erroring out on requests with too much range specifiers"""
 
1002
 
 
1003
    def __init__(self, request_handler=LimitedRangeRequestHandler,
 
1004
                 protocol_version=None,
 
1005
                 range_limit=None):
 
1006
        http_server.HttpServer.__init__(self, request_handler,
 
1007
                                        protocol_version=protocol_version)
 
1008
        self.range_limit = range_limit
 
1009
 
 
1010
 
 
1011
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
 
1012
    """Tests readv requests against a server erroring out on too much ranges."""
 
1013
 
 
1014
    # Requests with more range specifiers will error out
 
1015
    range_limit = 3
 
1016
 
 
1017
    def create_transport_readonly_server(self):
 
1018
        return LimitedRangeHTTPServer(range_limit=self.range_limit,
 
1019
                                      protocol_version=self._protocol_version)
 
1020
 
 
1021
    def get_transport(self):
 
1022
        return self._transport(self.get_readonly_server().get_url())
 
1023
 
 
1024
    def setUp(self):
 
1025
        http_utils.TestCaseWithWebserver.setUp(self)
 
1026
        # We need to manipulate ranges that correspond to real chunks in the
 
1027
        # response, so we build a content appropriately.
 
1028
        filler = ''.join(['abcdefghij' for x in range(102)])
 
1029
        content = ''.join(['%04d' % v + filler for v in range(16)])
 
1030
        self.build_tree_contents([('a', content)],)
 
1031
 
 
1032
    def test_few_ranges(self):
 
1033
        t = self.get_transport()
 
1034
        l = list(t.readv('a', ((0, 4), (1024, 4), )))
 
1035
        self.assertEqual(l[0], (0, '0000'))
 
1036
        self.assertEqual(l[1], (1024, '0001'))
 
1037
        self.assertEqual(1, self.get_readonly_server().GET_request_nb)
 
1038
 
 
1039
    def test_more_ranges(self):
 
1040
        t = self.get_transport()
 
1041
        l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
 
1042
        self.assertEqual(l[0], (0, '0000'))
 
1043
        self.assertEqual(l[1], (1024, '0001'))
 
1044
        self.assertEqual(l[2], (4096, '0004'))
 
1045
        self.assertEqual(l[3], (8192, '0008'))
 
1046
        # The server will refuse to serve the first request (too much ranges),
 
1047
        # a second request will succeed.
 
1048
        self.assertEqual(2, self.get_readonly_server().GET_request_nb)
 
1049
 
 
1050
 
 
1051
class TestHttpProxyWhiteBox(tests.TestCase):
 
1052
    """Whitebox test proxy http authorization.
 
1053
 
 
1054
    Only the urllib implementation is tested here.
 
1055
    """
 
1056
 
 
1057
    def setUp(self):
 
1058
        tests.TestCase.setUp(self)
 
1059
        self._old_env = {}
 
1060
 
 
1061
    def tearDown(self):
 
1062
        self._restore_env()
 
1063
        tests.TestCase.tearDown(self)
 
1064
 
 
1065
    def _install_env(self, env):
 
1066
        for name, value in env.iteritems():
 
1067
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1068
 
 
1069
    def _restore_env(self):
 
1070
        for name, value in self._old_env.iteritems():
 
1071
            osutils.set_or_unset_env(name, value)
 
1072
 
 
1073
    def _proxied_request(self):
 
1074
        handler = _urllib2_wrappers.ProxyHandler()
 
1075
        request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
 
1076
        handler.set_proxy(request, 'http')
 
1077
        return request
 
1078
 
 
1079
    def test_empty_user(self):
 
1080
        self._install_env({'http_proxy': 'http://bar.com'})
 
1081
        request = self._proxied_request()
 
1082
        self.assertFalse(request.headers.has_key('Proxy-authorization'))
 
1083
 
 
1084
    def test_invalid_proxy(self):
 
1085
        """A proxy env variable without scheme"""
 
1086
        self._install_env({'http_proxy': 'host:1234'})
 
1087
        self.assertRaises(errors.InvalidURL, self._proxied_request)
 
1088
 
 
1089
 
 
1090
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
 
1091
    """Tests proxy server.
 
1092
 
 
1093
    Be aware that we do not setup a real proxy here. Instead, we
 
1094
    check that the *connection* goes through the proxy by serving
 
1095
    different content (the faked proxy server append '-proxied'
 
1096
    to the file names).
 
1097
    """
 
1098
 
 
1099
    # FIXME: We don't have an https server available, so we don't
 
1100
    # test https connections.
 
1101
 
 
1102
    def setUp(self):
 
1103
        super(TestProxyHttpServer, self).setUp()
 
1104
        self.build_tree_contents([('foo', 'contents of foo\n'),
 
1105
                                  ('foo-proxied', 'proxied contents of foo\n')])
 
1106
        # Let's setup some attributes for tests
 
1107
        self.server = self.get_readonly_server()
 
1108
        self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
 
1109
        if self._testing_pycurl():
 
1110
            # Oh my ! pycurl does not check for the port as part of
 
1111
            # no_proxy :-( So we just test the host part
 
1112
            self.no_proxy_host = 'localhost'
 
1113
        else:
 
1114
            self.no_proxy_host = self.proxy_address
 
1115
        # The secondary server is the proxy
 
1116
        self.proxy = self.get_secondary_server()
 
1117
        self.proxy_url = self.proxy.get_url()
 
1118
        self._old_env = {}
 
1119
 
 
1120
    def _testing_pycurl(self):
 
1121
        return pycurl_present and self._transport == PyCurlTransport
 
1122
 
 
1123
    def create_transport_secondary_server(self):
 
1124
        """Creates an http server that will serve files with
 
1125
        '-proxied' appended to their names.
 
1126
        """
 
1127
        return http_utils.ProxyServer(protocol_version=self._protocol_version)
 
1128
 
 
1129
    def _install_env(self, env):
 
1130
        for name, value in env.iteritems():
 
1131
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1132
 
 
1133
    def _restore_env(self):
 
1134
        for name, value in self._old_env.iteritems():
 
1135
            osutils.set_or_unset_env(name, value)
 
1136
 
 
1137
    def proxied_in_env(self, env):
 
1138
        self._install_env(env)
 
1139
        url = self.server.get_url()
 
1140
        t = self._transport(url)
 
1141
        try:
 
1142
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
 
1143
        finally:
 
1144
            self._restore_env()
 
1145
 
 
1146
    def not_proxied_in_env(self, env):
 
1147
        self._install_env(env)
 
1148
        url = self.server.get_url()
 
1149
        t = self._transport(url)
 
1150
        try:
 
1151
            self.assertEqual('contents of foo\n', t.get('foo').read())
 
1152
        finally:
 
1153
            self._restore_env()
 
1154
 
 
1155
    def test_http_proxy(self):
 
1156
        self.proxied_in_env({'http_proxy': self.proxy_url})
 
1157
 
 
1158
    def test_HTTP_PROXY(self):
 
1159
        if self._testing_pycurl():
 
1160
            # pycurl does not check HTTP_PROXY for security reasons
 
1161
            # (for use in a CGI context that we do not care
 
1162
            # about. Should we ?)
 
1163
            raise tests.TestNotApplicable(
 
1164
                'pycurl does not check HTTP_PROXY for security reasons')
 
1165
        self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
 
1166
 
 
1167
    def test_all_proxy(self):
 
1168
        self.proxied_in_env({'all_proxy': self.proxy_url})
 
1169
 
 
1170
    def test_ALL_PROXY(self):
 
1171
        self.proxied_in_env({'ALL_PROXY': self.proxy_url})
 
1172
 
 
1173
    def test_http_proxy_with_no_proxy(self):
 
1174
        self.not_proxied_in_env({'http_proxy': self.proxy_url,
 
1175
                                 'no_proxy': self.no_proxy_host})
 
1176
 
 
1177
    def test_HTTP_PROXY_with_NO_PROXY(self):
 
1178
        if self._testing_pycurl():
 
1179
            raise tests.TestNotApplicable(
 
1180
                'pycurl does not check HTTP_PROXY for security reasons')
 
1181
        self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
 
1182
                                 'NO_PROXY': self.no_proxy_host})
 
1183
 
 
1184
    def test_all_proxy_with_no_proxy(self):
 
1185
        self.not_proxied_in_env({'all_proxy': self.proxy_url,
 
1186
                                 'no_proxy': self.no_proxy_host})
 
1187
 
 
1188
    def test_ALL_PROXY_with_NO_PROXY(self):
 
1189
        self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
 
1190
                                 'NO_PROXY': self.no_proxy_host})
 
1191
 
 
1192
    def test_http_proxy_without_scheme(self):
 
1193
        if self._testing_pycurl():
 
1194
            # pycurl *ignores* invalid proxy env variables. If that ever change
 
1195
            # in the future, this test will fail indicating that pycurl do not
 
1196
            # ignore anymore such variables.
 
1197
            self.not_proxied_in_env({'http_proxy': self.proxy_address})
 
1198
        else:
 
1199
            self.assertRaises(errors.InvalidURL,
 
1200
                              self.proxied_in_env,
 
1201
                              {'http_proxy': self.proxy_address})
 
1202
 
 
1203
 
 
1204
class TestRanges(http_utils.TestCaseWithWebserver):
 
1205
    """Test the Range header in GET methods."""
 
1206
 
 
1207
    def setUp(self):
 
1208
        http_utils.TestCaseWithWebserver.setUp(self)
 
1209
        self.build_tree_contents([('a', '0123456789')],)
 
1210
        server = self.get_readonly_server()
 
1211
        self.transport = self._transport(server.get_url())
 
1212
 
 
1213
    def create_transport_readonly_server(self):
 
1214
        return http_server.HttpServer(protocol_version=self._protocol_version)
 
1215
 
 
1216
    def _file_contents(self, relpath, ranges):
 
1217
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
1218
        coalesce = self.transport._coalesce_offsets
 
1219
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
1220
        code, data = self.transport._get(relpath, coalesced)
 
1221
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1222
        for start, end in ranges:
 
1223
            data.seek(start)
 
1224
            yield data.read(end - start + 1)
 
1225
 
 
1226
    def _file_tail(self, relpath, tail_amount):
 
1227
        code, data = self.transport._get(relpath, [], tail_amount)
 
1228
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1229
        data.seek(-tail_amount, 2)
 
1230
        return data.read(tail_amount)
 
1231
 
 
1232
    def test_range_header(self):
 
1233
        # Valid ranges
 
1234
        map(self.assertEqual,['0', '234'],
 
1235
            list(self._file_contents('a', [(0,0), (2,4)])),)
 
1236
 
 
1237
    def test_range_header_tail(self):
 
1238
        self.assertEqual('789', self._file_tail('a', 3))
 
1239
 
 
1240
    def test_syntactically_invalid_range_header(self):
 
1241
        self.assertListRaises(errors.InvalidHttpRange,
 
1242
                          self._file_contents, 'a', [(4, 3)])
 
1243
 
 
1244
    def test_semantically_invalid_range_header(self):
 
1245
        self.assertListRaises(errors.InvalidHttpRange,
 
1246
                          self._file_contents, 'a', [(42, 128)])
 
1247
 
 
1248
 
 
1249
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1250
    """Test redirection between http servers."""
 
1251
 
 
1252
    def create_transport_secondary_server(self):
 
1253
        """Create the secondary server redirecting to the primary server"""
 
1254
        new = self.get_readonly_server()
 
1255
 
 
1256
        redirecting = http_utils.HTTPServerRedirecting(
 
1257
            protocol_version=self._protocol_version)
 
1258
        redirecting.redirect_to(new.host, new.port)
 
1259
        return redirecting
 
1260
 
 
1261
    def setUp(self):
 
1262
        super(TestHTTPRedirections, self).setUp()
 
1263
        self.build_tree_contents([('a', '0123456789'),
 
1264
                                  ('bundle',
 
1265
                                  '# Bazaar revision bundle v0.9\n#\n')
 
1266
                                  ],)
 
1267
        # The requests to the old server will be redirected to the new server
 
1268
        self.old_transport = self._transport(self.old_server.get_url())
 
1269
 
 
1270
    def test_redirected(self):
 
1271
        self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
 
1272
        t = self._transport(self.new_server.get_url())
 
1273
        self.assertEqual('0123456789', t.get('a').read())
 
1274
 
 
1275
    def test_read_redirected_bundle_from_url(self):
 
1276
        from bzrlib.bundle import read_bundle_from_url
 
1277
        url = self.old_transport.abspath('bundle')
 
1278
        bundle = read_bundle_from_url(url)
 
1279
        # If read_bundle_from_url was successful we get an empty bundle
 
1280
        self.assertEqual([], bundle.revisions)
 
1281
 
 
1282
 
 
1283
class RedirectedRequest(_urllib2_wrappers.Request):
 
1284
    """Request following redirections. """
 
1285
 
 
1286
    init_orig = _urllib2_wrappers.Request.__init__
 
1287
 
 
1288
    def __init__(self, method, url, *args, **kwargs):
 
1289
        """Constructor.
 
1290
 
 
1291
        """
 
1292
        # Since the tests using this class will replace
 
1293
        # _urllib2_wrappers.Request, we can't just call the base class __init__
 
1294
        # or we'll loop.
 
1295
        RedirectedRequest.init_orig(self, method, url, args, kwargs)
 
1296
        self.follow_redirections = True
 
1297
 
 
1298
 
 
1299
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1300
    """Test redirections.
 
1301
 
 
1302
    http implementations do not redirect silently anymore (they
 
1303
    do not redirect at all in fact). The mechanism is still in
 
1304
    place at the _urllib2_wrappers.Request level and these tests
 
1305
    exercise it.
 
1306
 
 
1307
    For the pycurl implementation
 
1308
    the redirection have been deleted as we may deprecate pycurl
 
1309
    and I have no place to keep a working implementation.
 
1310
    -- vila 20070212
 
1311
    """
 
1312
 
 
1313
    def setUp(self):
 
1314
        if pycurl_present and self._transport == PyCurlTransport:
 
1315
            raise tests.TestNotApplicable(
 
1316
                "pycurl doesn't redirect silently annymore")
 
1317
        super(TestHTTPSilentRedirections, self).setUp()
 
1318
        self.setup_redirected_request()
 
1319
        self.addCleanup(self.cleanup_redirected_request)
 
1320
        self.build_tree_contents([('a','a'),
 
1321
                                  ('1/',),
 
1322
                                  ('1/a', 'redirected once'),
 
1323
                                  ('2/',),
 
1324
                                  ('2/a', 'redirected twice'),
 
1325
                                  ('3/',),
 
1326
                                  ('3/a', 'redirected thrice'),
 
1327
                                  ('4/',),
 
1328
                                  ('4/a', 'redirected 4 times'),
 
1329
                                  ('5/',),
 
1330
                                  ('5/a', 'redirected 5 times'),
 
1331
                                  ],)
 
1332
 
 
1333
        self.old_transport = self._transport(self.old_server.get_url())
 
1334
 
 
1335
    def setup_redirected_request(self):
 
1336
        self.original_class = _urllib2_wrappers.Request
 
1337
        _urllib2_wrappers.Request = RedirectedRequest
 
1338
 
 
1339
    def cleanup_redirected_request(self):
 
1340
        _urllib2_wrappers.Request = self.original_class
 
1341
 
 
1342
    def create_transport_secondary_server(self):
 
1343
        """Create the secondary server, redirections are defined in the tests"""
 
1344
        return http_utils.HTTPServerRedirecting(
 
1345
            protocol_version=self._protocol_version)
 
1346
 
 
1347
    def test_one_redirection(self):
 
1348
        t = self.old_transport
 
1349
 
 
1350
        req = RedirectedRequest('GET', t.abspath('a'))
 
1351
        req.follow_redirections = True
 
1352
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1353
                                       self.new_server.port)
 
1354
        self.old_server.redirections = \
 
1355
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
1356
        self.assertEquals('redirected once',t._perform(req).read())
 
1357
 
 
1358
    def test_five_redirections(self):
 
1359
        t = self.old_transport
 
1360
 
 
1361
        req = RedirectedRequest('GET', t.abspath('a'))
 
1362
        req.follow_redirections = True
 
1363
        old_prefix = 'http://%s:%s' % (self.old_server.host,
 
1364
                                       self.old_server.port)
 
1365
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1366
                                       self.new_server.port)
 
1367
        self.old_server.redirections = [
 
1368
            ('/1(.*)', r'%s/2\1' % (old_prefix), 302),
 
1369
            ('/2(.*)', r'%s/3\1' % (old_prefix), 303),
 
1370
            ('/3(.*)', r'%s/4\1' % (old_prefix), 307),
 
1371
            ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
 
1372
            ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
 
1373
            ]
 
1374
        self.assertEquals('redirected 5 times',t._perform(req).read())
 
1375
 
 
1376
 
 
1377
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1378
    """Test transport.do_catching_redirections."""
 
1379
 
 
1380
    def setUp(self):
 
1381
        super(TestDoCatchRedirections, self).setUp()
 
1382
        self.build_tree_contents([('a', '0123456789'),],)
 
1383
 
 
1384
        self.old_transport = self._transport(self.old_server.get_url())
 
1385
 
 
1386
    def get_a(self, transport):
 
1387
        return transport.get('a')
 
1388
 
 
1389
    def test_no_redirection(self):
 
1390
        t = self._transport(self.new_server.get_url())
 
1391
 
 
1392
        # We use None for redirected so that we fail if redirected
 
1393
        self.assertEquals('0123456789',
 
1394
                          transport.do_catching_redirections(
 
1395
                self.get_a, t, None).read())
 
1396
 
 
1397
    def test_one_redirection(self):
 
1398
        self.redirections = 0
 
1399
 
 
1400
        def redirected(transport, exception, redirection_notice):
 
1401
            self.redirections += 1
 
1402
            dir, file = urlutils.split(exception.target)
 
1403
            return self._transport(dir)
 
1404
 
 
1405
        self.assertEquals('0123456789',
 
1406
                          transport.do_catching_redirections(
 
1407
                self.get_a, self.old_transport, redirected).read())
 
1408
        self.assertEquals(1, self.redirections)
 
1409
 
 
1410
    def test_redirection_loop(self):
 
1411
 
 
1412
        def redirected(transport, exception, redirection_notice):
 
1413
            # By using the redirected url as a base dir for the
 
1414
            # *old* transport, we create a loop: a => a/a =>
 
1415
            # a/a/a
 
1416
            return self.old_transport.clone(exception.target)
 
1417
 
 
1418
        self.assertRaises(errors.TooManyRedirections,
 
1419
                          transport.do_catching_redirections,
 
1420
                          self.get_a, self.old_transport, redirected)
 
1421
 
 
1422
 
 
1423
class TestAuth(http_utils.TestCaseWithWebserver):
 
1424
    """Test authentication scheme"""
 
1425
 
 
1426
    _auth_header = 'Authorization'
 
1427
    _password_prompt_prefix = ''
 
1428
 
 
1429
    def setUp(self):
 
1430
        super(TestAuth, self).setUp()
 
1431
        self.server = self.get_readonly_server()
 
1432
        self.build_tree_contents([('a', 'contents of a\n'),
 
1433
                                  ('b', 'contents of b\n'),])
 
1434
 
 
1435
    def create_transport_readonly_server(self):
 
1436
        if self._auth_scheme == 'basic':
 
1437
            server = http_utils.HTTPBasicAuthServer(
 
1438
                protocol_version=self._protocol_version)
 
1439
        else:
 
1440
            if self._auth_scheme != 'digest':
 
1441
                raise AssertionError('Unknown auth scheme: %r'
 
1442
                                     % self._auth_scheme)
 
1443
            server = http_utils.HTTPDigestAuthServer(
 
1444
                protocol_version=self._protocol_version)
 
1445
        return server
 
1446
 
 
1447
    def _testing_pycurl(self):
 
1448
        return pycurl_present and self._transport == PyCurlTransport
 
1449
 
 
1450
    def get_user_url(self, user=None, password=None):
 
1451
        """Build an url embedding user and password"""
 
1452
        url = '%s://' % self.server._url_protocol
 
1453
        if user is not None:
 
1454
            url += user
 
1455
            if password is not None:
 
1456
                url += ':' + password
 
1457
            url += '@'
 
1458
        url += '%s:%s/' % (self.server.host, self.server.port)
 
1459
        return url
 
1460
 
 
1461
    def get_user_transport(self, user=None, password=None):
 
1462
        return self._transport(self.get_user_url(user, password))
 
1463
 
 
1464
    def test_no_user(self):
 
1465
        self.server.add_user('joe', 'foo')
 
1466
        t = self.get_user_transport()
 
1467
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1468
        # Only one 'Authentication Required' error should occur
 
1469
        self.assertEqual(1, self.server.auth_required_errors)
 
1470
 
 
1471
    def test_empty_pass(self):
 
1472
        self.server.add_user('joe', '')
 
1473
        t = self.get_user_transport('joe', '')
 
1474
        self.assertEqual('contents of a\n', t.get('a').read())
 
1475
        # Only one 'Authentication Required' error should occur
 
1476
        self.assertEqual(1, self.server.auth_required_errors)
 
1477
 
 
1478
    def test_user_pass(self):
 
1479
        self.server.add_user('joe', 'foo')
 
1480
        t = self.get_user_transport('joe', 'foo')
 
1481
        self.assertEqual('contents of a\n', t.get('a').read())
 
1482
        # Only one 'Authentication Required' error should occur
 
1483
        self.assertEqual(1, self.server.auth_required_errors)
 
1484
 
 
1485
    def test_unknown_user(self):
 
1486
        self.server.add_user('joe', 'foo')
 
1487
        t = self.get_user_transport('bill', 'foo')
 
1488
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1489
        # Two 'Authentication Required' errors should occur (the
 
1490
        # initial 'who are you' and 'I don't know you, who are
 
1491
        # you').
 
1492
        self.assertEqual(2, self.server.auth_required_errors)
 
1493
 
 
1494
    def test_wrong_pass(self):
 
1495
        self.server.add_user('joe', 'foo')
 
1496
        t = self.get_user_transport('joe', 'bar')
 
1497
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1498
        # Two 'Authentication Required' errors should occur (the
 
1499
        # initial 'who are you' and 'this is not you, who are you')
 
1500
        self.assertEqual(2, self.server.auth_required_errors)
 
1501
 
 
1502
    def test_prompt_for_password(self):
 
1503
        if self._testing_pycurl():
 
1504
            raise tests.TestNotApplicable(
 
1505
                'pycurl cannot prompt, it handles auth by embedding'
 
1506
                ' user:pass in urls only')
 
1507
 
 
1508
        self.server.add_user('joe', 'foo')
 
1509
        t = self.get_user_transport('joe', None)
 
1510
        stdout = tests.StringIOWrapper()
 
1511
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n', stdout=stdout)
 
1512
        self.assertEqual('contents of a\n',t.get('a').read())
 
1513
        # stdin should be empty
 
1514
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1515
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1516
                                    stdout.getvalue())
 
1517
        # And we shouldn't prompt again for a different request
 
1518
        # against the same transport.
 
1519
        self.assertEqual('contents of b\n',t.get('b').read())
 
1520
        t2 = t.clone()
 
1521
        # And neither against a clone
 
1522
        self.assertEqual('contents of b\n',t2.get('b').read())
 
1523
        # Only one 'Authentication Required' error should occur
 
1524
        self.assertEqual(1, self.server.auth_required_errors)
 
1525
 
 
1526
    def _check_password_prompt(self, scheme, user, actual_prompt):
 
1527
        expected_prompt = (self._password_prompt_prefix
 
1528
                           + ("%s %s@%s:%d, Realm: '%s' password: "
 
1529
                              % (scheme.upper(),
 
1530
                                 user, self.server.host, self.server.port,
 
1531
                                 self.server.auth_realm)))
 
1532
        self.assertEquals(expected_prompt, actual_prompt)
 
1533
 
 
1534
    def test_no_prompt_for_password_when_using_auth_config(self):
 
1535
        if self._testing_pycurl():
 
1536
            raise tests.TestNotApplicable(
 
1537
                'pycurl does not support authentication.conf'
 
1538
                ' since it cannot prompt')
 
1539
 
 
1540
        user =' joe'
 
1541
        password = 'foo'
 
1542
        stdin_content = 'bar\n'  # Not the right password
 
1543
        self.server.add_user(user, password)
 
1544
        t = self.get_user_transport(user, None)
 
1545
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
 
1546
                                            stdout=tests.StringIOWrapper())
 
1547
        # Create a minimal config file with the right password
 
1548
        conf = config.AuthenticationConfig()
 
1549
        conf._get_config().update(
 
1550
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1551
                          'user': user, 'password': password}})
 
1552
        conf._save()
 
1553
        # Issue a request to the server to connect
 
1554
        self.assertEqual('contents of a\n',t.get('a').read())
 
1555
        # stdin should have  been left untouched
 
1556
        self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
 
1557
        # Only one 'Authentication Required' error should occur
 
1558
        self.assertEqual(1, self.server.auth_required_errors)
 
1559
 
 
1560
    def test_changing_nonce(self):
 
1561
        if self._auth_scheme != 'digest':
 
1562
            raise tests.TestNotApplicable('HTTP auth digest only test')
 
1563
        if self._testing_pycurl():
 
1564
            raise tests.KnownFailure(
 
1565
                'pycurl does not handle a nonce change')
 
1566
        self.server.add_user('joe', 'foo')
 
1567
        t = self.get_user_transport('joe', 'foo')
 
1568
        self.assertEqual('contents of a\n', t.get('a').read())
 
1569
        self.assertEqual('contents of b\n', t.get('b').read())
 
1570
        # Only one 'Authentication Required' error should have
 
1571
        # occured so far
 
1572
        self.assertEqual(1, self.server.auth_required_errors)
 
1573
        # The server invalidates the current nonce
 
1574
        self.server.auth_nonce = self.server.auth_nonce + '. No, now!'
 
1575
        self.assertEqual('contents of a\n', t.get('a').read())
 
1576
        # Two 'Authentication Required' errors should occur (the
 
1577
        # initial 'who are you' and a second 'who are you' with the new nonce)
 
1578
        self.assertEqual(2, self.server.auth_required_errors)
 
1579
 
 
1580
 
 
1581
 
 
1582
class TestProxyAuth(TestAuth):
 
1583
    """Test proxy authentication schemes."""
 
1584
 
 
1585
    _auth_header = 'Proxy-authorization'
 
1586
    _password_prompt_prefix='Proxy '
 
1587
 
 
1588
    def setUp(self):
 
1589
        super(TestProxyAuth, self).setUp()
 
1590
        self._old_env = {}
 
1591
        self.addCleanup(self._restore_env)
 
1592
        # Override the contents to avoid false positives
 
1593
        self.build_tree_contents([('a', 'not proxied contents of a\n'),
 
1594
                                  ('b', 'not proxied contents of b\n'),
 
1595
                                  ('a-proxied', 'contents of a\n'),
 
1596
                                  ('b-proxied', 'contents of b\n'),
 
1597
                                  ])
 
1598
 
 
1599
    def create_transport_readonly_server(self):
 
1600
        if self._auth_scheme == 'basic':
 
1601
            server = http_utils.ProxyBasicAuthServer(
 
1602
                protocol_version=self._protocol_version)
 
1603
        else:
 
1604
            if self._auth_scheme != 'digest':
 
1605
                raise AssertionError('Unknown auth scheme: %r'
 
1606
                                     % self._auth_scheme)
 
1607
            server = http_utils.ProxyDigestAuthServer(
 
1608
                protocol_version=self._protocol_version)
 
1609
        return server
 
1610
 
 
1611
    def get_user_transport(self, user=None, password=None):
 
1612
        self._install_env({'all_proxy': self.get_user_url(user, password)})
 
1613
        return self._transport(self.server.get_url())
 
1614
 
 
1615
    def _install_env(self, env):
 
1616
        for name, value in env.iteritems():
 
1617
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1618
 
 
1619
    def _restore_env(self):
 
1620
        for name, value in self._old_env.iteritems():
 
1621
            osutils.set_or_unset_env(name, value)
 
1622
 
 
1623
    def test_empty_pass(self):
 
1624
        if self._testing_pycurl():
 
1625
            import pycurl
 
1626
            if pycurl.version_info()[1] < '7.16.0':
 
1627
                raise tests.KnownFailure(
 
1628
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
 
1629
        super(TestProxyAuth, self).test_empty_pass()
 
1630
 
 
1631
 
 
1632
class SampleSocket(object):
 
1633
    """A socket-like object for use in testing the HTTP request handler."""
 
1634
 
 
1635
    def __init__(self, socket_read_content):
 
1636
        """Constructs a sample socket.
 
1637
 
 
1638
        :param socket_read_content: a byte sequence
 
1639
        """
 
1640
        # Use plain python StringIO so we can monkey-patch the close method to
 
1641
        # not discard the contents.
 
1642
        from StringIO import StringIO
 
1643
        self.readfile = StringIO(socket_read_content)
 
1644
        self.writefile = StringIO()
 
1645
        self.writefile.close = lambda: None
 
1646
 
 
1647
    def makefile(self, mode='r', bufsize=None):
 
1648
        if 'r' in mode:
 
1649
            return self.readfile
 
1650
        else:
 
1651
            return self.writefile
 
1652
 
 
1653
 
 
1654
class SmartHTTPTunnellingTest(tests.TestCaseWithTransport):
 
1655
 
 
1656
    def setUp(self):
 
1657
        super(SmartHTTPTunnellingTest, self).setUp()
 
1658
        # We use the VFS layer as part of HTTP tunnelling tests.
 
1659
        self._captureVar('BZR_NO_SMART_VFS', None)
 
1660
        self.transport_readonly_server = http_utils.HTTPServerWithSmarts
 
1661
 
 
1662
    def create_transport_readonly_server(self):
 
1663
        return http_utils.HTTPServerWithSmarts(
 
1664
            protocol_version=self._protocol_version)
 
1665
 
 
1666
    def test_open_bzrdir(self):
 
1667
        branch = self.make_branch('relpath')
 
1668
        http_server = self.get_readonly_server()
 
1669
        url = http_server.get_url() + 'relpath'
 
1670
        bd = bzrdir.BzrDir.open(url)
 
1671
        self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
 
1672
 
 
1673
    def test_bulk_data(self):
 
1674
        # We should be able to send and receive bulk data in a single message.
 
1675
        # The 'readv' command in the smart protocol both sends and receives
 
1676
        # bulk data, so we use that.
 
1677
        self.build_tree(['data-file'])
 
1678
        http_server = self.get_readonly_server()
 
1679
        http_transport = self._transport(http_server.get_url())
 
1680
        medium = http_transport.get_smart_medium()
 
1681
        # Since we provide the medium, the url below will be mostly ignored
 
1682
        # during the test, as long as the path is '/'.
 
1683
        remote_transport = remote.RemoteTransport('bzr://fake_host/',
 
1684
                                                  medium=medium)
 
1685
        self.assertEqual(
 
1686
            [(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
 
1687
 
 
1688
    def test_http_send_smart_request(self):
 
1689
 
 
1690
        post_body = 'hello\n'
 
1691
        expected_reply_body = 'ok\x012\n'
 
1692
 
 
1693
        http_server = self.get_readonly_server()
 
1694
        http_transport = self._transport(http_server.get_url())
 
1695
        medium = http_transport.get_smart_medium()
 
1696
        response = medium.send_http_smart_request(post_body)
 
1697
        reply_body = response.read()
 
1698
        self.assertEqual(expected_reply_body, reply_body)
 
1699
 
 
1700
    def test_smart_http_server_post_request_handler(self):
 
1701
        httpd = self.get_readonly_server()._get_httpd()
 
1702
 
 
1703
        socket = SampleSocket(
 
1704
            'POST /.bzr/smart %s \r\n' % self._protocol_version
 
1705
            # HTTP/1.1 posts must have a Content-Length (but it doesn't hurt
 
1706
            # for 1.0)
 
1707
            + 'Content-Length: 6\r\n'
 
1708
            '\r\n'
 
1709
            'hello\n')
 
1710
        # Beware: the ('localhost', 80) below is the
 
1711
        # client_address parameter, but we don't have one because
 
1712
        # we have defined a socket which is not bound to an
 
1713
        # address. The test framework never uses this client
 
1714
        # address, so far...
 
1715
        request_handler = http_utils.SmartRequestHandler(socket,
 
1716
                                                         ('localhost', 80),
 
1717
                                                         httpd)
 
1718
        response = socket.writefile.getvalue()
 
1719
        self.assertStartsWith(response, '%s 200 ' % self._protocol_version)
 
1720
        # This includes the end of the HTTP headers, and all the body.
 
1721
        expected_end_of_response = '\r\n\r\nok\x012\n'
 
1722
        self.assertEndsWith(response, expected_end_of_response)
 
1723
 
 
1724
 
 
1725
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
1726
    """No smart server here request handler."""
 
1727
 
 
1728
    def do_POST(self):
 
1729
        self.send_error(403, "Forbidden")
 
1730
 
 
1731
 
 
1732
class SmartClientAgainstNotSmartServer(TestSpecificRequestHandler):
 
1733
    """Test smart client behaviour against an http server without smarts."""
 
1734
 
 
1735
    _req_handler_class = ForbiddenRequestHandler
 
1736
 
 
1737
    def test_probe_smart_server(self):
 
1738
        """Test error handling against server refusing smart requests."""
 
1739
        server = self.get_readonly_server()
 
1740
        t = self._transport(server.get_url())
 
1741
        # No need to build a valid smart request here, the server will not even
 
1742
        # try to interpret it.
 
1743
        self.assertRaises(errors.SmartProtocolError,
 
1744
                          t.get_smart_medium().send_http_smart_request,
 
1745
                          'whatever')
 
1746
 
 
1747
class Test_redirected_to(tests.TestCase):
 
1748
 
 
1749
    def test_redirected_to_subdir(self):
 
1750
        t = self._transport('http://www.example.com/foo')
 
1751
        r = t._redirected_to('http://www.example.com/foo',
 
1752
                             'http://www.example.com/foo/subdir')
 
1753
        self.assertIsInstance(r, type(t))
 
1754
        # Both transports share the some connection
 
1755
        self.assertEquals(t._get_connection(), r._get_connection())
 
1756
 
 
1757
    def test_redirected_to_self_with_slash(self):
 
1758
        t = self._transport('http://www.example.com/foo')
 
1759
        r = t._redirected_to('http://www.example.com/foo',
 
1760
                             'http://www.example.com/foo/')
 
1761
        self.assertIsInstance(r, type(t))
 
1762
        # Both transports share the some connection (one can argue that we
 
1763
        # should return the exact same transport here, but that seems
 
1764
        # overkill).
 
1765
        self.assertEquals(t._get_connection(), r._get_connection())
 
1766
 
 
1767
    def test_redirected_to_host(self):
 
1768
        t = self._transport('http://www.example.com/foo')
 
1769
        r = t._redirected_to('http://www.example.com/foo',
 
1770
                             'http://foo.example.com/foo/subdir')
 
1771
        self.assertIsInstance(r, type(t))
 
1772
 
 
1773
    def test_redirected_to_same_host_sibling_protocol(self):
 
1774
        t = self._transport('http://www.example.com/foo')
 
1775
        r = t._redirected_to('http://www.example.com/foo',
 
1776
                             'https://www.example.com/foo')
 
1777
        self.assertIsInstance(r, type(t))
 
1778
 
 
1779
    def test_redirected_to_same_host_different_protocol(self):
 
1780
        t = self._transport('http://www.example.com/foo')
 
1781
        r = t._redirected_to('http://www.example.com/foo',
 
1782
                             'ftp://www.example.com/foo')
 
1783
        self.assertNotEquals(type(r), type(t))
 
1784
 
 
1785
    def test_redirected_to_different_host_same_user(self):
 
1786
        t = self._transport('http://joe@www.example.com/foo')
 
1787
        r = t._redirected_to('http://www.example.com/foo',
 
1788
                             'https://foo.example.com/foo')
 
1789
        self.assertIsInstance(r, type(t))
 
1790
        self.assertEquals(t._user, r._user)