/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Vincent Ladeuil
  • Date: 2009-11-03 09:32:17 UTC
  • mto: (4784.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4785.
  • Revision ID: v.ladeuil+lp@free.fr-20091103093217-sodpsi6fzb36vk9y
Hooks daughter classes should always call the base constructor

* bzrlib/tests/test_hooks.py:
(TestHookRegistry.test_items_are_reasonable_keys): Reproduce bug
#389648 and ensures we won't regress again.

* bzrlib/version_info_formats/format_rio.py:
(RioVersionInfoBuilderHooks.__init__): Call base constructor.

* bzrlib/info.py:
(InfoHooks.__init__): Call base constructor.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for HTTP implementations.
 
18
 
 
19
This module defines a load_tests() method that parametrize tests classes for
 
20
transport implementation, http protocol versions and authentication schemes.
 
21
"""
 
22
 
 
23
# TODO: Should be renamed to bzrlib.transport.http.tests?
 
24
# TODO: What about renaming to bzrlib.tests.transport.http ?
 
25
 
 
26
from cStringIO import StringIO
 
27
import httplib
 
28
import os
 
29
import select
 
30
import SimpleHTTPServer
 
31
import socket
 
32
import sys
 
33
import threading
 
34
 
 
35
import bzrlib
 
36
from bzrlib import (
 
37
    bzrdir,
 
38
    config,
 
39
    errors,
 
40
    osutils,
 
41
    remote as _mod_remote,
 
42
    tests,
 
43
    transport,
 
44
    ui,
 
45
    urlutils,
 
46
    )
 
47
from bzrlib.symbol_versioning import (
 
48
    deprecated_in,
 
49
    )
 
50
from bzrlib.tests import (
 
51
    http_server,
 
52
    http_utils,
 
53
    )
 
54
from bzrlib.transport import (
 
55
    http,
 
56
    remote,
 
57
    )
 
58
from bzrlib.transport.http import (
 
59
    _urllib,
 
60
    _urllib2_wrappers,
 
61
    )
 
62
 
 
63
 
 
64
try:
 
65
    from bzrlib.transport.http._pycurl import PyCurlTransport
 
66
    pycurl_present = True
 
67
except errors.DependencyNotPresent:
 
68
    pycurl_present = False
 
69
 
 
70
 
 
71
def load_tests(standard_tests, module, loader):
 
72
    """Multiply tests for http clients and protocol versions."""
 
73
    result = loader.suiteClass()
 
74
 
 
75
    # one for each transport implementation
 
76
    t_tests, remaining_tests = tests.split_suite_by_condition(
 
77
        standard_tests, tests.condition_isinstance((
 
78
                TestHttpTransportRegistration,
 
79
                TestHttpTransportUrls,
 
80
                Test_redirected_to,
 
81
                )))
 
82
    transport_scenarios = [
 
83
        ('urllib', dict(_transport=_urllib.HttpTransport_urllib,
 
84
                        _server=http_server.HttpServer_urllib,
 
85
                        _qualified_prefix='http+urllib',)),
 
86
        ]
 
87
    if pycurl_present:
 
88
        transport_scenarios.append(
 
89
            ('pycurl', dict(_transport=PyCurlTransport,
 
90
                            _server=http_server.HttpServer_PyCurl,
 
91
                            _qualified_prefix='http+pycurl',)))
 
92
    tests.multiply_tests(t_tests, transport_scenarios, result)
 
93
 
 
94
    # each implementation tested with each HTTP version
 
95
    tp_tests, remaining_tests = tests.split_suite_by_condition(
 
96
        remaining_tests, tests.condition_isinstance((
 
97
                SmartHTTPTunnellingTest,
 
98
                TestDoCatchRedirections,
 
99
                TestHTTPConnections,
 
100
                TestHTTPRedirections,
 
101
                TestHTTPSilentRedirections,
 
102
                TestLimitedRangeRequestServer,
 
103
                TestPost,
 
104
                TestProxyHttpServer,
 
105
                TestRanges,
 
106
                TestSpecificRequestHandler,
 
107
                )))
 
108
    protocol_scenarios = [
 
109
            ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
 
110
            ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
111
            ]
 
112
    tp_scenarios = tests.multiply_scenarios(transport_scenarios,
 
113
                                            protocol_scenarios)
 
114
    tests.multiply_tests(tp_tests, tp_scenarios, result)
 
115
 
 
116
    # proxy auth: each auth scheme on all http versions on all implementations.
 
117
    tppa_tests, remaining_tests = tests.split_suite_by_condition(
 
118
        remaining_tests, tests.condition_isinstance((
 
119
                TestProxyAuth,
 
120
                )))
 
121
    proxy_auth_scheme_scenarios = [
 
122
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
123
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
124
        ('basicdigest',
 
125
         dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
126
        ]
 
127
    tppa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
128
                                              proxy_auth_scheme_scenarios)
 
129
    tests.multiply_tests(tppa_tests, tppa_scenarios, result)
 
130
 
 
131
    # auth: each auth scheme on all http versions on all implementations.
 
132
    tpa_tests, remaining_tests = tests.split_suite_by_condition(
 
133
        remaining_tests, tests.condition_isinstance((
 
134
                TestAuth,
 
135
                )))
 
136
    auth_scheme_scenarios = [
 
137
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
 
138
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
 
139
        ('basicdigest',
 
140
         dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
 
141
        ]
 
142
    tpa_scenarios = tests.multiply_scenarios(tp_scenarios,
 
143
                                             auth_scheme_scenarios)
 
144
    tests.multiply_tests(tpa_tests, tpa_scenarios, result)
 
145
 
 
146
    # activity: on all http[s] versions on all implementations
 
147
    tpact_tests, remaining_tests = tests.split_suite_by_condition(
 
148
        remaining_tests, tests.condition_isinstance((
 
149
                TestActivity,
 
150
                )))
 
151
    activity_scenarios = [
 
152
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
 
153
                             _transport=_urllib.HttpTransport_urllib,)),
 
154
        ]
 
155
    if tests.HTTPSServerFeature.available():
 
156
        activity_scenarios.append(
 
157
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
 
158
                                  _transport=_urllib.HttpTransport_urllib,)),)
 
159
    if pycurl_present:
 
160
        activity_scenarios.append(
 
161
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
 
162
                                 _transport=PyCurlTransport,)),)
 
163
        if tests.HTTPSServerFeature.available():
 
164
            from bzrlib.tests import (
 
165
                ssl_certs,
 
166
                )
 
167
            # FIXME: Until we have a better way to handle self-signed
 
168
            # certificates (like allowing them in a test specific
 
169
            # authentication.conf for example), we need some specialized pycurl
 
170
            # transport for tests.
 
171
            class HTTPS_pycurl_transport(PyCurlTransport):
 
172
 
 
173
                def __init__(self, base, _from_transport=None):
 
174
                    super(HTTPS_pycurl_transport, self).__init__(
 
175
                        base, _from_transport)
 
176
                    self.cabundle = str(ssl_certs.build_path('ca.crt'))
 
177
 
 
178
            activity_scenarios.append(
 
179
                ('pycurl,https', dict(_activity_server=ActivityHTTPSServer,
 
180
                                      _transport=HTTPS_pycurl_transport,)),)
 
181
 
 
182
    tpact_scenarios = tests.multiply_scenarios(activity_scenarios,
 
183
                                               protocol_scenarios)
 
184
    tests.multiply_tests(tpact_tests, tpact_scenarios, result)
 
185
 
 
186
    # No parametrization for the remaining tests
 
187
    result.addTests(remaining_tests)
 
188
 
 
189
    return result
 
190
 
 
191
 
 
192
class FakeManager(object):
 
193
 
 
194
    def __init__(self):
 
195
        self.credentials = []
 
196
 
 
197
    def add_password(self, realm, host, username, password):
 
198
        self.credentials.append([realm, host, username, password])
 
199
 
 
200
 
 
201
class RecordingServer(object):
 
202
    """A fake HTTP server.
 
203
 
 
204
    It records the bytes sent to it, and replies with a 200.
 
205
    """
 
206
 
 
207
    def __init__(self, expect_body_tail=None, scheme=''):
 
208
        """Constructor.
 
209
 
 
210
        :type expect_body_tail: str
 
211
        :param expect_body_tail: a reply won't be sent until this string is
 
212
            received.
 
213
        """
 
214
        self._expect_body_tail = expect_body_tail
 
215
        self.host = None
 
216
        self.port = None
 
217
        self.received_bytes = ''
 
218
        self.scheme = scheme
 
219
 
 
220
    def get_url(self):
 
221
        return '%s://%s:%s/' % (self.scheme, self.host, self.port)
 
222
 
 
223
    def setUp(self):
 
224
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
225
        self._sock.bind(('127.0.0.1', 0))
 
226
        self.host, self.port = self._sock.getsockname()
 
227
        self._ready = threading.Event()
 
228
        self._thread = threading.Thread(target=self._accept_read_and_reply)
 
229
        self._thread.setDaemon(True)
 
230
        self._thread.start()
 
231
        self._ready.wait(5)
 
232
 
 
233
    def _accept_read_and_reply(self):
 
234
        self._sock.listen(1)
 
235
        self._ready.set()
 
236
        self._sock.settimeout(5)
 
237
        try:
 
238
            conn, address = self._sock.accept()
 
239
            # On win32, the accepted connection will be non-blocking to start
 
240
            # with because we're using settimeout.
 
241
            conn.setblocking(True)
 
242
            while not self.received_bytes.endswith(self._expect_body_tail):
 
243
                self.received_bytes += conn.recv(4096)
 
244
            conn.sendall('HTTP/1.1 200 OK\r\n')
 
245
        except socket.timeout:
 
246
            # Make sure the client isn't stuck waiting for us to e.g. accept.
 
247
            self._sock.close()
 
248
        except socket.error:
 
249
            # The client may have already closed the socket.
 
250
            pass
 
251
 
 
252
    def tearDown(self):
 
253
        try:
 
254
            self._sock.close()
 
255
        except socket.error:
 
256
            # We might have already closed it.  We don't care.
 
257
            pass
 
258
        self.host = None
 
259
        self.port = None
 
260
 
 
261
 
 
262
class TestAuthHeader(tests.TestCase):
 
263
 
 
264
    def parse_header(self, header, auth_handler_class=None):
 
265
        if auth_handler_class is None:
 
266
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
 
267
        self.auth_handler =  auth_handler_class()
 
268
        return self.auth_handler._parse_auth_header(header)
 
269
 
 
270
    def test_empty_header(self):
 
271
        scheme, remainder = self.parse_header('')
 
272
        self.assertEquals('', scheme)
 
273
        self.assertIs(None, remainder)
 
274
 
 
275
    def test_negotiate_header(self):
 
276
        scheme, remainder = self.parse_header('Negotiate')
 
277
        self.assertEquals('negotiate', scheme)
 
278
        self.assertIs(None, remainder)
 
279
 
 
280
    def test_basic_header(self):
 
281
        scheme, remainder = self.parse_header(
 
282
            'Basic realm="Thou should not pass"')
 
283
        self.assertEquals('basic', scheme)
 
284
        self.assertEquals('realm="Thou should not pass"', remainder)
 
285
 
 
286
    def test_basic_extract_realm(self):
 
287
        scheme, remainder = self.parse_header(
 
288
            'Basic realm="Thou should not pass"',
 
289
            _urllib2_wrappers.BasicAuthHandler)
 
290
        match, realm = self.auth_handler.extract_realm(remainder)
 
291
        self.assertTrue(match is not None)
 
292
        self.assertEquals('Thou should not pass', realm)
 
293
 
 
294
    def test_digest_header(self):
 
295
        scheme, remainder = self.parse_header(
 
296
            'Digest realm="Thou should not pass"')
 
297
        self.assertEquals('digest', scheme)
 
298
        self.assertEquals('realm="Thou should not pass"', remainder)
 
299
 
 
300
 
 
301
class TestHTTPServer(tests.TestCase):
 
302
    """Test the HTTP servers implementations."""
 
303
 
 
304
    def test_invalid_protocol(self):
 
305
        class BogusRequestHandler(http_server.TestingHTTPRequestHandler):
 
306
 
 
307
            protocol_version = 'HTTP/0.1'
 
308
 
 
309
        server = http_server.HttpServer(BogusRequestHandler)
 
310
        try:
 
311
            self.assertRaises(httplib.UnknownProtocol, server.setUp)
 
312
        except:
 
313
            server.tearDown()
 
314
            self.fail('HTTP Server creation did not raise UnknownProtocol')
 
315
 
 
316
    def test_force_invalid_protocol(self):
 
317
        server = http_server.HttpServer(protocol_version='HTTP/0.1')
 
318
        try:
 
319
            self.assertRaises(httplib.UnknownProtocol, server.setUp)
 
320
        except:
 
321
            server.tearDown()
 
322
            self.fail('HTTP Server creation did not raise UnknownProtocol')
 
323
 
 
324
    def test_server_start_and_stop(self):
 
325
        server = http_server.HttpServer()
 
326
        server.setUp()
 
327
        try:
 
328
            self.assertTrue(server._http_running)
 
329
        finally:
 
330
            server.tearDown()
 
331
        self.assertFalse(server._http_running)
 
332
 
 
333
    def test_create_http_server_one_zero(self):
 
334
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
 
335
 
 
336
            protocol_version = 'HTTP/1.0'
 
337
 
 
338
        server = http_server.HttpServer(RequestHandlerOneZero)
 
339
        self.start_server(server)
 
340
        self.assertIsInstance(server._httpd, http_server.TestingHTTPServer)
 
341
 
 
342
    def test_create_http_server_one_one(self):
 
343
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
 
344
 
 
345
            protocol_version = 'HTTP/1.1'
 
346
 
 
347
        server = http_server.HttpServer(RequestHandlerOneOne)
 
348
        self.start_server(server)
 
349
        self.assertIsInstance(server._httpd,
 
350
                              http_server.TestingThreadingHTTPServer)
 
351
 
 
352
    def test_create_http_server_force_one_one(self):
 
353
        class RequestHandlerOneZero(http_server.TestingHTTPRequestHandler):
 
354
 
 
355
            protocol_version = 'HTTP/1.0'
 
356
 
 
357
        server = http_server.HttpServer(RequestHandlerOneZero,
 
358
                                        protocol_version='HTTP/1.1')
 
359
        self.start_server(server)
 
360
        self.assertIsInstance(server._httpd,
 
361
                              http_server.TestingThreadingHTTPServer)
 
362
 
 
363
    def test_create_http_server_force_one_zero(self):
 
364
        class RequestHandlerOneOne(http_server.TestingHTTPRequestHandler):
 
365
 
 
366
            protocol_version = 'HTTP/1.1'
 
367
 
 
368
        server = http_server.HttpServer(RequestHandlerOneOne,
 
369
                                        protocol_version='HTTP/1.0')
 
370
        self.start_server(server)
 
371
        self.assertIsInstance(server._httpd,
 
372
                              http_server.TestingHTTPServer)
 
373
 
 
374
 
 
375
class TestWithTransport_pycurl(object):
 
376
    """Test case to inherit from if pycurl is present"""
 
377
 
 
378
    def _get_pycurl_maybe(self):
 
379
        try:
 
380
            from bzrlib.transport.http._pycurl import PyCurlTransport
 
381
            return PyCurlTransport
 
382
        except errors.DependencyNotPresent:
 
383
            raise tests.TestSkipped('pycurl not present')
 
384
 
 
385
    _transport = property(_get_pycurl_maybe)
 
386
 
 
387
 
 
388
class TestHttpUrls(tests.TestCase):
 
389
 
 
390
    # TODO: This should be moved to authorization tests once they
 
391
    # are written.
 
392
 
 
393
    def test_url_parsing(self):
 
394
        f = FakeManager()
 
395
        url = http.extract_auth('http://example.com', f)
 
396
        self.assertEquals('http://example.com', url)
 
397
        self.assertEquals(0, len(f.credentials))
 
398
        url = http.extract_auth(
 
399
            'http://user:pass@www.bazaar-vcs.org/bzr/bzr.dev', f)
 
400
        self.assertEquals('http://www.bazaar-vcs.org/bzr/bzr.dev', url)
 
401
        self.assertEquals(1, len(f.credentials))
 
402
        self.assertEquals([None, 'www.bazaar-vcs.org', 'user', 'pass'],
 
403
                          f.credentials[0])
 
404
 
 
405
 
 
406
class TestHttpTransportUrls(tests.TestCase):
 
407
    """Test the http urls."""
 
408
 
 
409
    def test_abs_url(self):
 
410
        """Construction of absolute http URLs"""
 
411
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
412
        eq = self.assertEqualDiff
 
413
        eq(t.abspath('.'), 'http://bazaar-vcs.org/bzr/bzr.dev')
 
414
        eq(t.abspath('foo/bar'), 'http://bazaar-vcs.org/bzr/bzr.dev/foo/bar')
 
415
        eq(t.abspath('.bzr'), 'http://bazaar-vcs.org/bzr/bzr.dev/.bzr')
 
416
        eq(t.abspath('.bzr/1//2/./3'),
 
417
           'http://bazaar-vcs.org/bzr/bzr.dev/.bzr/1/2/3')
 
418
 
 
419
    def test_invalid_http_urls(self):
 
420
        """Trap invalid construction of urls"""
 
421
        t = self._transport('http://bazaar-vcs.org/bzr/bzr.dev/')
 
422
        self.assertRaises(errors.InvalidURL,
 
423
                          self._transport,
 
424
                          'http://http://bazaar-vcs.org/bzr/bzr.dev/')
 
425
 
 
426
    def test_http_root_urls(self):
 
427
        """Construction of URLs from server root"""
 
428
        t = self._transport('http://bzr.ozlabs.org/')
 
429
        eq = self.assertEqualDiff
 
430
        eq(t.abspath('.bzr/tree-version'),
 
431
           'http://bzr.ozlabs.org/.bzr/tree-version')
 
432
 
 
433
    def test_http_impl_urls(self):
 
434
        """There are servers which ask for particular clients to connect"""
 
435
        server = self._server()
 
436
        server.setUp()
 
437
        try:
 
438
            url = server.get_url()
 
439
            self.assertTrue(url.startswith('%s://' % self._qualified_prefix))
 
440
        finally:
 
441
            server.tearDown()
 
442
 
 
443
 
 
444
class TestHttps_pycurl(TestWithTransport_pycurl, tests.TestCase):
 
445
 
 
446
    # TODO: This should really be moved into another pycurl
 
447
    # specific test. When https tests will be implemented, take
 
448
    # this one into account.
 
449
    def test_pycurl_without_https_support(self):
 
450
        """Test that pycurl without SSL do not fail with a traceback.
 
451
 
 
452
        For the purpose of the test, we force pycurl to ignore
 
453
        https by supplying a fake version_info that do not
 
454
        support it.
 
455
        """
 
456
        try:
 
457
            import pycurl
 
458
        except ImportError:
 
459
            raise tests.TestSkipped('pycurl not present')
 
460
 
 
461
        version_info_orig = pycurl.version_info
 
462
        try:
 
463
            # Now that we have pycurl imported, we can fake its version_info
 
464
            # This was taken from a windows pycurl without SSL
 
465
            # (thanks to bialix)
 
466
            pycurl.version_info = lambda : (2,
 
467
                                            '7.13.2',
 
468
                                            462082,
 
469
                                            'i386-pc-win32',
 
470
                                            2576,
 
471
                                            None,
 
472
                                            0,
 
473
                                            None,
 
474
                                            ('ftp', 'gopher', 'telnet',
 
475
                                             'dict', 'ldap', 'http', 'file'),
 
476
                                            None,
 
477
                                            0,
 
478
                                            None)
 
479
            self.assertRaises(errors.DependencyNotPresent, self._transport,
 
480
                              'https://launchpad.net')
 
481
        finally:
 
482
            # Restore the right function
 
483
            pycurl.version_info = version_info_orig
 
484
 
 
485
 
 
486
class TestHTTPConnections(http_utils.TestCaseWithWebserver):
 
487
    """Test the http connections."""
 
488
 
 
489
    def setUp(self):
 
490
        http_utils.TestCaseWithWebserver.setUp(self)
 
491
        self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
 
492
                        transport=self.get_transport())
 
493
 
 
494
    def test_http_has(self):
 
495
        server = self.get_readonly_server()
 
496
        t = self._transport(server.get_url())
 
497
        self.assertEqual(t.has('foo/bar'), True)
 
498
        self.assertEqual(len(server.logs), 1)
 
499
        self.assertContainsRe(server.logs[0],
 
500
            r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "bzr/')
 
501
 
 
502
    def test_http_has_not_found(self):
 
503
        server = self.get_readonly_server()
 
504
        t = self._transport(server.get_url())
 
505
        self.assertEqual(t.has('not-found'), False)
 
506
        self.assertContainsRe(server.logs[1],
 
507
            r'"HEAD /not-found HTTP/1.." 404 - "-" "bzr/')
 
508
 
 
509
    def test_http_get(self):
 
510
        server = self.get_readonly_server()
 
511
        t = self._transport(server.get_url())
 
512
        fp = t.get('foo/bar')
 
513
        self.assertEqualDiff(
 
514
            fp.read(),
 
515
            'contents of foo/bar\n')
 
516
        self.assertEqual(len(server.logs), 1)
 
517
        self.assertTrue(server.logs[0].find(
 
518
            '"GET /foo/bar HTTP/1.1" 200 - "-" "bzr/%s'
 
519
            % bzrlib.__version__) > -1)
 
520
 
 
521
    def test_has_on_bogus_host(self):
 
522
        # Get a free address and don't 'accept' on it, so that we
 
523
        # can be sure there is no http handler there, but set a
 
524
        # reasonable timeout to not slow down tests too much.
 
525
        default_timeout = socket.getdefaulttimeout()
 
526
        try:
 
527
            socket.setdefaulttimeout(2)
 
528
            s = socket.socket()
 
529
            s.bind(('localhost', 0))
 
530
            t = self._transport('http://%s:%s/' % s.getsockname())
 
531
            self.assertRaises(errors.ConnectionError, t.has, 'foo/bar')
 
532
        finally:
 
533
            socket.setdefaulttimeout(default_timeout)
 
534
 
 
535
 
 
536
class TestHttpTransportRegistration(tests.TestCase):
 
537
    """Test registrations of various http implementations"""
 
538
 
 
539
    def test_http_registered(self):
 
540
        t = transport.get_transport('%s://foo.com/' % self._qualified_prefix)
 
541
        self.assertIsInstance(t, transport.Transport)
 
542
        self.assertIsInstance(t, self._transport)
 
543
 
 
544
 
 
545
class TestPost(tests.TestCase):
 
546
 
 
547
    def test_post_body_is_received(self):
 
548
        server = RecordingServer(expect_body_tail='end-of-body',
 
549
            scheme=self._qualified_prefix)
 
550
        self.start_server(server)
 
551
        url = server.get_url()
 
552
        http_transport = self._transport(url)
 
553
        code, response = http_transport._post('abc def end-of-body')
 
554
        self.assertTrue(
 
555
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
 
556
        self.assertTrue('content-length: 19\r' in server.received_bytes.lower())
 
557
        # The transport should not be assuming that the server can accept
 
558
        # chunked encoding the first time it connects, because HTTP/1.1, so we
 
559
        # check for the literal string.
 
560
        self.assertTrue(
 
561
            server.received_bytes.endswith('\r\n\r\nabc def end-of-body'))
 
562
 
 
563
 
 
564
class TestRangeHeader(tests.TestCase):
 
565
    """Test range_header method"""
 
566
 
 
567
    def check_header(self, value, ranges=[], tail=0):
 
568
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
569
        coalesce = transport.Transport._coalesce_offsets
 
570
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
571
        range_header = http.HttpTransportBase._range_header
 
572
        self.assertEqual(value, range_header(coalesced, tail))
 
573
 
 
574
    def test_range_header_single(self):
 
575
        self.check_header('0-9', ranges=[(0,9)])
 
576
        self.check_header('100-109', ranges=[(100,109)])
 
577
 
 
578
    def test_range_header_tail(self):
 
579
        self.check_header('-10', tail=10)
 
580
        self.check_header('-50', tail=50)
 
581
 
 
582
    def test_range_header_multi(self):
 
583
        self.check_header('0-9,100-200,300-5000',
 
584
                          ranges=[(0,9), (100, 200), (300,5000)])
 
585
 
 
586
    def test_range_header_mixed(self):
 
587
        self.check_header('0-9,300-5000,-50',
 
588
                          ranges=[(0,9), (300,5000)],
 
589
                          tail=50)
 
590
 
 
591
 
 
592
class TestSpecificRequestHandler(http_utils.TestCaseWithWebserver):
 
593
    """Tests a specific request handler.
 
594
 
 
595
    Daughter classes are expected to override _req_handler_class
 
596
    """
 
597
 
 
598
    # Provide a useful default
 
599
    _req_handler_class = http_server.TestingHTTPRequestHandler
 
600
 
 
601
    def create_transport_readonly_server(self):
 
602
        return http_server.HttpServer(self._req_handler_class,
 
603
                                      protocol_version=self._protocol_version)
 
604
 
 
605
    def _testing_pycurl(self):
 
606
        return pycurl_present and self._transport == PyCurlTransport
 
607
 
 
608
 
 
609
class WallRequestHandler(http_server.TestingHTTPRequestHandler):
 
610
    """Whatever request comes in, close the connection"""
 
611
 
 
612
    def handle_one_request(self):
 
613
        """Handle a single HTTP request, by abruptly closing the connection"""
 
614
        self.close_connection = 1
 
615
 
 
616
 
 
617
class TestWallServer(TestSpecificRequestHandler):
 
618
    """Tests exceptions during the connection phase"""
 
619
 
 
620
    _req_handler_class = WallRequestHandler
 
621
 
 
622
    def test_http_has(self):
 
623
        server = self.get_readonly_server()
 
624
        t = self._transport(server.get_url())
 
625
        # Unfortunately httplib (see HTTPResponse._read_status
 
626
        # for details) make no distinction between a closed
 
627
        # socket and badly formatted status line, so we can't
 
628
        # just test for ConnectionError, we have to test
 
629
        # InvalidHttpResponse too. And pycurl may raise ConnectionReset
 
630
        # instead of ConnectionError too.
 
631
        self.assertRaises(( errors.ConnectionError, errors.ConnectionReset,
 
632
                            errors.InvalidHttpResponse),
 
633
                          t.has, 'foo/bar')
 
634
 
 
635
    def test_http_get(self):
 
636
        server = self.get_readonly_server()
 
637
        t = self._transport(server.get_url())
 
638
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
 
639
                           errors.InvalidHttpResponse),
 
640
                          t.get, 'foo/bar')
 
641
 
 
642
 
 
643
class BadStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
644
    """Whatever request comes in, returns a bad status"""
 
645
 
 
646
    def parse_request(self):
 
647
        """Fakes handling a single HTTP request, returns a bad status"""
 
648
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
649
        self.send_response(0, "Bad status")
 
650
        self.close_connection = 1
 
651
        return False
 
652
 
 
653
 
 
654
class TestBadStatusServer(TestSpecificRequestHandler):
 
655
    """Tests bad status from server."""
 
656
 
 
657
    _req_handler_class = BadStatusRequestHandler
 
658
 
 
659
    def test_http_has(self):
 
660
        server = self.get_readonly_server()
 
661
        t = self._transport(server.get_url())
 
662
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
663
 
 
664
    def test_http_get(self):
 
665
        server = self.get_readonly_server()
 
666
        t = self._transport(server.get_url())
 
667
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
668
 
 
669
 
 
670
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
 
671
    """Whatever request comes in, returns an invalid status"""
 
672
 
 
673
    def parse_request(self):
 
674
        """Fakes handling a single HTTP request, returns a bad status"""
 
675
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
676
        self.wfile.write("Invalid status line\r\n")
 
677
        return False
 
678
 
 
679
 
 
680
class TestInvalidStatusServer(TestBadStatusServer):
 
681
    """Tests invalid status from server.
 
682
 
 
683
    Both implementations raises the same error as for a bad status.
 
684
    """
 
685
 
 
686
    _req_handler_class = InvalidStatusRequestHandler
 
687
 
 
688
    def test_http_has(self):
 
689
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
 
690
            raise tests.KnownFailure(
 
691
                'pycurl hangs if the server send back garbage')
 
692
        super(TestInvalidStatusServer, self).test_http_has()
 
693
 
 
694
    def test_http_get(self):
 
695
        if self._testing_pycurl() and self._protocol_version == 'HTTP/1.1':
 
696
            raise tests.KnownFailure(
 
697
                'pycurl hangs if the server send back garbage')
 
698
        super(TestInvalidStatusServer, self).test_http_get()
 
699
 
 
700
 
 
701
class BadProtocolRequestHandler(http_server.TestingHTTPRequestHandler):
 
702
    """Whatever request comes in, returns a bad protocol version"""
 
703
 
 
704
    def parse_request(self):
 
705
        """Fakes handling a single HTTP request, returns a bad status"""
 
706
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
707
        # Returns an invalid protocol version, but curl just
 
708
        # ignores it and those cannot be tested.
 
709
        self.wfile.write("%s %d %s\r\n" % ('HTTP/0.0',
 
710
                                           404,
 
711
                                           'Look at my protocol version'))
 
712
        return False
 
713
 
 
714
 
 
715
class TestBadProtocolServer(TestSpecificRequestHandler):
 
716
    """Tests bad protocol from server."""
 
717
 
 
718
    _req_handler_class = BadProtocolRequestHandler
 
719
 
 
720
    def setUp(self):
 
721
        if pycurl_present and self._transport == PyCurlTransport:
 
722
            raise tests.TestNotApplicable(
 
723
                "pycurl doesn't check the protocol version")
 
724
        super(TestBadProtocolServer, self).setUp()
 
725
 
 
726
    def test_http_has(self):
 
727
        server = self.get_readonly_server()
 
728
        t = self._transport(server.get_url())
 
729
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
 
730
 
 
731
    def test_http_get(self):
 
732
        server = self.get_readonly_server()
 
733
        t = self._transport(server.get_url())
 
734
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
 
735
 
 
736
 
 
737
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
738
    """Whatever request comes in, returns a 403 code"""
 
739
 
 
740
    def parse_request(self):
 
741
        """Handle a single HTTP request, by replying we cannot handle it"""
 
742
        ignored = http_server.TestingHTTPRequestHandler.parse_request(self)
 
743
        self.send_error(403)
 
744
        return False
 
745
 
 
746
 
 
747
class TestForbiddenServer(TestSpecificRequestHandler):
 
748
    """Tests forbidden server"""
 
749
 
 
750
    _req_handler_class = ForbiddenRequestHandler
 
751
 
 
752
    def test_http_has(self):
 
753
        server = self.get_readonly_server()
 
754
        t = self._transport(server.get_url())
 
755
        self.assertRaises(errors.TransportError, t.has, 'foo/bar')
 
756
 
 
757
    def test_http_get(self):
 
758
        server = self.get_readonly_server()
 
759
        t = self._transport(server.get_url())
 
760
        self.assertRaises(errors.TransportError, t.get, 'foo/bar')
 
761
 
 
762
 
 
763
class TestRecordingServer(tests.TestCase):
 
764
 
 
765
    def test_create(self):
 
766
        server = RecordingServer(expect_body_tail=None)
 
767
        self.assertEqual('', server.received_bytes)
 
768
        self.assertEqual(None, server.host)
 
769
        self.assertEqual(None, server.port)
 
770
 
 
771
    def test_setUp_and_tearDown(self):
 
772
        server = RecordingServer(expect_body_tail=None)
 
773
        server.setUp()
 
774
        try:
 
775
            self.assertNotEqual(None, server.host)
 
776
            self.assertNotEqual(None, server.port)
 
777
        finally:
 
778
            server.tearDown()
 
779
        self.assertEqual(None, server.host)
 
780
        self.assertEqual(None, server.port)
 
781
 
 
782
    def test_send_receive_bytes(self):
 
783
        server = RecordingServer(expect_body_tail='c', scheme='http')
 
784
        self.start_server(server)
 
785
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
786
        sock.connect((server.host, server.port))
 
787
        sock.sendall('abc')
 
788
        self.assertEqual('HTTP/1.1 200 OK\r\n',
 
789
                         osutils.recv_all(sock, 4096))
 
790
        self.assertEqual('abc', server.received_bytes)
 
791
 
 
792
 
 
793
class TestRangeRequestServer(TestSpecificRequestHandler):
 
794
    """Tests readv requests against server.
 
795
 
 
796
    We test against default "normal" server.
 
797
    """
 
798
 
 
799
    def setUp(self):
 
800
        super(TestRangeRequestServer, self).setUp()
 
801
        self.build_tree_contents([('a', '0123456789')],)
 
802
 
 
803
    def test_readv(self):
 
804
        server = self.get_readonly_server()
 
805
        t = self._transport(server.get_url())
 
806
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
807
        self.assertEqual(l[0], (0, '0'))
 
808
        self.assertEqual(l[1], (1, '1'))
 
809
        self.assertEqual(l[2], (3, '34'))
 
810
        self.assertEqual(l[3], (9, '9'))
 
811
 
 
812
    def test_readv_out_of_order(self):
 
813
        server = self.get_readonly_server()
 
814
        t = self._transport(server.get_url())
 
815
        l = list(t.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
816
        self.assertEqual(l[0], (1, '1'))
 
817
        self.assertEqual(l[1], (9, '9'))
 
818
        self.assertEqual(l[2], (0, '0'))
 
819
        self.assertEqual(l[3], (3, '34'))
 
820
 
 
821
    def test_readv_invalid_ranges(self):
 
822
        server = self.get_readonly_server()
 
823
        t = self._transport(server.get_url())
 
824
 
 
825
        # This is intentionally reading off the end of the file
 
826
        # since we are sure that it cannot get there
 
827
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
828
                              t.readv, 'a', [(1,1), (8,10)])
 
829
 
 
830
        # This is trying to seek past the end of the file, it should
 
831
        # also raise a special error
 
832
        self.assertListRaises((errors.InvalidRange, errors.ShortReadvError,),
 
833
                              t.readv, 'a', [(12,2)])
 
834
 
 
835
    def test_readv_multiple_get_requests(self):
 
836
        server = self.get_readonly_server()
 
837
        t = self._transport(server.get_url())
 
838
        # force transport to issue multiple requests
 
839
        t._max_readv_combine = 1
 
840
        t._max_get_ranges = 1
 
841
        l = list(t.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
842
        self.assertEqual(l[0], (0, '0'))
 
843
        self.assertEqual(l[1], (1, '1'))
 
844
        self.assertEqual(l[2], (3, '34'))
 
845
        self.assertEqual(l[3], (9, '9'))
 
846
        # The server should have issued 4 requests
 
847
        self.assertEqual(4, server.GET_request_nb)
 
848
 
 
849
    def test_readv_get_max_size(self):
 
850
        server = self.get_readonly_server()
 
851
        t = self._transport(server.get_url())
 
852
        # force transport to issue multiple requests by limiting the number of
 
853
        # bytes by request. Note that this apply to coalesced offsets only, a
 
854
        # single range will keep its size even if bigger than the limit.
 
855
        t._get_max_size = 2
 
856
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
857
        self.assertEqual(l[0], (0, '0'))
 
858
        self.assertEqual(l[1], (1, '1'))
 
859
        self.assertEqual(l[2], (2, '2345'))
 
860
        self.assertEqual(l[3], (6, '6789'))
 
861
        # The server should have issued 3 requests
 
862
        self.assertEqual(3, server.GET_request_nb)
 
863
 
 
864
    def test_complete_readv_leave_pipe_clean(self):
 
865
        server = self.get_readonly_server()
 
866
        t = self._transport(server.get_url())
 
867
        # force transport to issue multiple requests
 
868
        t._get_max_size = 2
 
869
        l = list(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
870
        # The server should have issued 3 requests
 
871
        self.assertEqual(3, server.GET_request_nb)
 
872
        self.assertEqual('0123456789', t.get_bytes('a'))
 
873
        self.assertEqual(4, server.GET_request_nb)
 
874
 
 
875
    def test_incomplete_readv_leave_pipe_clean(self):
 
876
        server = self.get_readonly_server()
 
877
        t = self._transport(server.get_url())
 
878
        # force transport to issue multiple requests
 
879
        t._get_max_size = 2
 
880
        # Don't collapse readv results into a list so that we leave unread
 
881
        # bytes on the socket
 
882
        ireadv = iter(t.readv('a', ((0, 1), (1, 1), (2, 4), (6, 4))))
 
883
        self.assertEqual((0, '0'), ireadv.next())
 
884
        # The server should have issued one request so far
 
885
        self.assertEqual(1, server.GET_request_nb)
 
886
        self.assertEqual('0123456789', t.get_bytes('a'))
 
887
        # get_bytes issued an additional request, the readv pending ones are
 
888
        # lost
 
889
        self.assertEqual(2, server.GET_request_nb)
 
890
 
 
891
 
 
892
class SingleRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
893
    """Always reply to range request as if they were single.
 
894
 
 
895
    Don't be explicit about it, just to annoy the clients.
 
896
    """
 
897
 
 
898
    def get_multiple_ranges(self, file, file_size, ranges):
 
899
        """Answer as if it was a single range request and ignores the rest"""
 
900
        (start, end) = ranges[0]
 
901
        return self.get_single_range(file, file_size, start, end)
 
902
 
 
903
 
 
904
class TestSingleRangeRequestServer(TestRangeRequestServer):
 
905
    """Test readv against a server which accept only single range requests"""
 
906
 
 
907
    _req_handler_class = SingleRangeRequestHandler
 
908
 
 
909
 
 
910
class SingleOnlyRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
911
    """Only reply to simple range requests, errors out on multiple"""
 
912
 
 
913
    def get_multiple_ranges(self, file, file_size, ranges):
 
914
        """Refuses the multiple ranges request"""
 
915
        if len(ranges) > 1:
 
916
            file.close()
 
917
            self.send_error(416, "Requested range not satisfiable")
 
918
            return
 
919
        (start, end) = ranges[0]
 
920
        return self.get_single_range(file, file_size, start, end)
 
921
 
 
922
 
 
923
class TestSingleOnlyRangeRequestServer(TestRangeRequestServer):
 
924
    """Test readv against a server which only accept single range requests"""
 
925
 
 
926
    _req_handler_class = SingleOnlyRangeRequestHandler
 
927
 
 
928
 
 
929
class NoRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
930
    """Ignore range requests without notice"""
 
931
 
 
932
    def do_GET(self):
 
933
        # Update the statistics
 
934
        self.server.test_case_server.GET_request_nb += 1
 
935
        # Just bypass the range handling done by TestingHTTPRequestHandler
 
936
        return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
 
937
 
 
938
 
 
939
class TestNoRangeRequestServer(TestRangeRequestServer):
 
940
    """Test readv against a server which do not accept range requests"""
 
941
 
 
942
    _req_handler_class = NoRangeRequestHandler
 
943
 
 
944
 
 
945
class MultipleRangeWithoutContentLengthRequestHandler(
 
946
    http_server.TestingHTTPRequestHandler):
 
947
    """Reply to multiple range requests without content length header."""
 
948
 
 
949
    def get_multiple_ranges(self, file, file_size, ranges):
 
950
        self.send_response(206)
 
951
        self.send_header('Accept-Ranges', 'bytes')
 
952
        boundary = "%d" % random.randint(0,0x7FFFFFFF)
 
953
        self.send_header("Content-Type",
 
954
                         "multipart/byteranges; boundary=%s" % boundary)
 
955
        self.end_headers()
 
956
        for (start, end) in ranges:
 
957
            self.wfile.write("--%s\r\n" % boundary)
 
958
            self.send_header("Content-type", 'application/octet-stream')
 
959
            self.send_header("Content-Range", "bytes %d-%d/%d" % (start,
 
960
                                                                  end,
 
961
                                                                  file_size))
 
962
            self.end_headers()
 
963
            self.send_range_content(file, start, end - start + 1)
 
964
        # Final boundary
 
965
        self.wfile.write("--%s\r\n" % boundary)
 
966
 
 
967
 
 
968
class TestMultipleRangeWithoutContentLengthServer(TestRangeRequestServer):
 
969
 
 
970
    _req_handler_class = MultipleRangeWithoutContentLengthRequestHandler
 
971
 
 
972
 
 
973
class TruncatedMultipleRangeRequestHandler(
 
974
    http_server.TestingHTTPRequestHandler):
 
975
    """Reply to multiple range requests truncating the last ones.
 
976
 
 
977
    This server generates responses whose Content-Length describes all the
 
978
    ranges, but fail to include the last ones leading to client short reads.
 
979
    This has been observed randomly with lighttpd (bug #179368).
 
980
    """
 
981
 
 
982
    _truncated_ranges = 2
 
983
 
 
984
    def get_multiple_ranges(self, file, file_size, ranges):
 
985
        self.send_response(206)
 
986
        self.send_header('Accept-Ranges', 'bytes')
 
987
        boundary = 'tagada'
 
988
        self.send_header('Content-Type',
 
989
                         'multipart/byteranges; boundary=%s' % boundary)
 
990
        boundary_line = '--%s\r\n' % boundary
 
991
        # Calculate the Content-Length
 
992
        content_length = 0
 
993
        for (start, end) in ranges:
 
994
            content_length += len(boundary_line)
 
995
            content_length += self._header_line_length(
 
996
                'Content-type', 'application/octet-stream')
 
997
            content_length += self._header_line_length(
 
998
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
 
999
            content_length += len('\r\n') # end headers
 
1000
            content_length += end - start # + 1
 
1001
        content_length += len(boundary_line)
 
1002
        self.send_header('Content-length', content_length)
 
1003
        self.end_headers()
 
1004
 
 
1005
        # Send the multipart body
 
1006
        cur = 0
 
1007
        for (start, end) in ranges:
 
1008
            self.wfile.write(boundary_line)
 
1009
            self.send_header('Content-type', 'application/octet-stream')
 
1010
            self.send_header('Content-Range', 'bytes %d-%d/%d'
 
1011
                             % (start, end, file_size))
 
1012
            self.end_headers()
 
1013
            if cur + self._truncated_ranges >= len(ranges):
 
1014
                # Abruptly ends the response and close the connection
 
1015
                self.close_connection = 1
 
1016
                return
 
1017
            self.send_range_content(file, start, end - start + 1)
 
1018
            cur += 1
 
1019
        # No final boundary
 
1020
        self.wfile.write(boundary_line)
 
1021
 
 
1022
 
 
1023
class TestTruncatedMultipleRangeServer(TestSpecificRequestHandler):
 
1024
 
 
1025
    _req_handler_class = TruncatedMultipleRangeRequestHandler
 
1026
 
 
1027
    def setUp(self):
 
1028
        super(TestTruncatedMultipleRangeServer, self).setUp()
 
1029
        self.build_tree_contents([('a', '0123456789')],)
 
1030
 
 
1031
    def test_readv_with_short_reads(self):
 
1032
        server = self.get_readonly_server()
 
1033
        t = self._transport(server.get_url())
 
1034
        # Force separate ranges for each offset
 
1035
        t._bytes_to_read_before_seek = 0
 
1036
        ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
 
1037
        self.assertEqual((0, '0'), ireadv.next())
 
1038
        self.assertEqual((2, '2'), ireadv.next())
 
1039
        if not self._testing_pycurl():
 
1040
            # Only one request have been issued so far (except for pycurl that
 
1041
            # try to read the whole response at once)
 
1042
            self.assertEqual(1, server.GET_request_nb)
 
1043
        self.assertEqual((4, '45'), ireadv.next())
 
1044
        self.assertEqual((9, '9'), ireadv.next())
 
1045
        # Both implementations issue 3 requests but:
 
1046
        # - urllib does two multiple (4 ranges, then 2 ranges) then a single
 
1047
        #   range,
 
1048
        # - pycurl does two multiple (4 ranges, 4 ranges) then a single range
 
1049
        self.assertEqual(3, server.GET_request_nb)
 
1050
        # Finally the client have tried a single range request and stays in
 
1051
        # that mode
 
1052
        self.assertEqual('single', t._range_hint)
 
1053
 
 
1054
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
 
1055
    """Errors out when range specifiers exceed the limit"""
 
1056
 
 
1057
    def get_multiple_ranges(self, file, file_size, ranges):
 
1058
        """Refuses the multiple ranges request"""
 
1059
        tcs = self.server.test_case_server
 
1060
        if tcs.range_limit is not None and len(ranges) > tcs.range_limit:
 
1061
            file.close()
 
1062
            # Emulate apache behavior
 
1063
            self.send_error(400, "Bad Request")
 
1064
            return
 
1065
        return http_server.TestingHTTPRequestHandler.get_multiple_ranges(
 
1066
            self, file, file_size, ranges)
 
1067
 
 
1068
 
 
1069
class LimitedRangeHTTPServer(http_server.HttpServer):
 
1070
    """An HttpServer erroring out on requests with too much range specifiers"""
 
1071
 
 
1072
    def __init__(self, request_handler=LimitedRangeRequestHandler,
 
1073
                 protocol_version=None,
 
1074
                 range_limit=None):
 
1075
        http_server.HttpServer.__init__(self, request_handler,
 
1076
                                        protocol_version=protocol_version)
 
1077
        self.range_limit = range_limit
 
1078
 
 
1079
 
 
1080
class TestLimitedRangeRequestServer(http_utils.TestCaseWithWebserver):
 
1081
    """Tests readv requests against a server erroring out on too much ranges."""
 
1082
 
 
1083
    # Requests with more range specifiers will error out
 
1084
    range_limit = 3
 
1085
 
 
1086
    def create_transport_readonly_server(self):
 
1087
        return LimitedRangeHTTPServer(range_limit=self.range_limit,
 
1088
                                      protocol_version=self._protocol_version)
 
1089
 
 
1090
    def get_transport(self):
 
1091
        return self._transport(self.get_readonly_server().get_url())
 
1092
 
 
1093
    def setUp(self):
 
1094
        http_utils.TestCaseWithWebserver.setUp(self)
 
1095
        # We need to manipulate ranges that correspond to real chunks in the
 
1096
        # response, so we build a content appropriately.
 
1097
        filler = ''.join(['abcdefghij' for x in range(102)])
 
1098
        content = ''.join(['%04d' % v + filler for v in range(16)])
 
1099
        self.build_tree_contents([('a', content)],)
 
1100
 
 
1101
    def test_few_ranges(self):
 
1102
        t = self.get_transport()
 
1103
        l = list(t.readv('a', ((0, 4), (1024, 4), )))
 
1104
        self.assertEqual(l[0], (0, '0000'))
 
1105
        self.assertEqual(l[1], (1024, '0001'))
 
1106
        self.assertEqual(1, self.get_readonly_server().GET_request_nb)
 
1107
 
 
1108
    def test_more_ranges(self):
 
1109
        t = self.get_transport()
 
1110
        l = list(t.readv('a', ((0, 4), (1024, 4), (4096, 4), (8192, 4))))
 
1111
        self.assertEqual(l[0], (0, '0000'))
 
1112
        self.assertEqual(l[1], (1024, '0001'))
 
1113
        self.assertEqual(l[2], (4096, '0004'))
 
1114
        self.assertEqual(l[3], (8192, '0008'))
 
1115
        # The server will refuse to serve the first request (too much ranges),
 
1116
        # a second request will succeed.
 
1117
        self.assertEqual(2, self.get_readonly_server().GET_request_nb)
 
1118
 
 
1119
 
 
1120
class TestHttpProxyWhiteBox(tests.TestCase):
 
1121
    """Whitebox test proxy http authorization.
 
1122
 
 
1123
    Only the urllib implementation is tested here.
 
1124
    """
 
1125
 
 
1126
    def setUp(self):
 
1127
        tests.TestCase.setUp(self)
 
1128
        self._old_env = {}
 
1129
 
 
1130
    def tearDown(self):
 
1131
        self._restore_env()
 
1132
        tests.TestCase.tearDown(self)
 
1133
 
 
1134
    def _install_env(self, env):
 
1135
        for name, value in env.iteritems():
 
1136
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1137
 
 
1138
    def _restore_env(self):
 
1139
        for name, value in self._old_env.iteritems():
 
1140
            osutils.set_or_unset_env(name, value)
 
1141
 
 
1142
    def _proxied_request(self):
 
1143
        handler = _urllib2_wrappers.ProxyHandler()
 
1144
        request = _urllib2_wrappers.Request('GET','http://baz/buzzle')
 
1145
        handler.set_proxy(request, 'http')
 
1146
        return request
 
1147
 
 
1148
    def test_empty_user(self):
 
1149
        self._install_env({'http_proxy': 'http://bar.com'})
 
1150
        request = self._proxied_request()
 
1151
        self.assertFalse(request.headers.has_key('Proxy-authorization'))
 
1152
 
 
1153
    def test_invalid_proxy(self):
 
1154
        """A proxy env variable without scheme"""
 
1155
        self._install_env({'http_proxy': 'host:1234'})
 
1156
        self.assertRaises(errors.InvalidURL, self._proxied_request)
 
1157
 
 
1158
 
 
1159
class TestProxyHttpServer(http_utils.TestCaseWithTwoWebservers):
 
1160
    """Tests proxy server.
 
1161
 
 
1162
    Be aware that we do not setup a real proxy here. Instead, we
 
1163
    check that the *connection* goes through the proxy by serving
 
1164
    different content (the faked proxy server append '-proxied'
 
1165
    to the file names).
 
1166
    """
 
1167
 
 
1168
    # FIXME: We don't have an https server available, so we don't
 
1169
    # test https connections.
 
1170
 
 
1171
    def setUp(self):
 
1172
        super(TestProxyHttpServer, self).setUp()
 
1173
        self.build_tree_contents([('foo', 'contents of foo\n'),
 
1174
                                  ('foo-proxied', 'proxied contents of foo\n')])
 
1175
        # Let's setup some attributes for tests
 
1176
        self.server = self.get_readonly_server()
 
1177
        self.proxy_address = '%s:%d' % (self.server.host, self.server.port)
 
1178
        if self._testing_pycurl():
 
1179
            # Oh my ! pycurl does not check for the port as part of
 
1180
            # no_proxy :-( So we just test the host part
 
1181
            self.no_proxy_host = 'localhost'
 
1182
        else:
 
1183
            self.no_proxy_host = self.proxy_address
 
1184
        # The secondary server is the proxy
 
1185
        self.proxy = self.get_secondary_server()
 
1186
        self.proxy_url = self.proxy.get_url()
 
1187
        self._old_env = {}
 
1188
 
 
1189
    def _testing_pycurl(self):
 
1190
        return pycurl_present and self._transport == PyCurlTransport
 
1191
 
 
1192
    def create_transport_secondary_server(self):
 
1193
        """Creates an http server that will serve files with
 
1194
        '-proxied' appended to their names.
 
1195
        """
 
1196
        return http_utils.ProxyServer(protocol_version=self._protocol_version)
 
1197
 
 
1198
    def _install_env(self, env):
 
1199
        for name, value in env.iteritems():
 
1200
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1201
 
 
1202
    def _restore_env(self):
 
1203
        for name, value in self._old_env.iteritems():
 
1204
            osutils.set_or_unset_env(name, value)
 
1205
 
 
1206
    def proxied_in_env(self, env):
 
1207
        self._install_env(env)
 
1208
        url = self.server.get_url()
 
1209
        t = self._transport(url)
 
1210
        try:
 
1211
            self.assertEqual('proxied contents of foo\n', t.get('foo').read())
 
1212
        finally:
 
1213
            self._restore_env()
 
1214
 
 
1215
    def not_proxied_in_env(self, env):
 
1216
        self._install_env(env)
 
1217
        url = self.server.get_url()
 
1218
        t = self._transport(url)
 
1219
        try:
 
1220
            self.assertEqual('contents of foo\n', t.get('foo').read())
 
1221
        finally:
 
1222
            self._restore_env()
 
1223
 
 
1224
    def test_http_proxy(self):
 
1225
        self.proxied_in_env({'http_proxy': self.proxy_url})
 
1226
 
 
1227
    def test_HTTP_PROXY(self):
 
1228
        if self._testing_pycurl():
 
1229
            # pycurl does not check HTTP_PROXY for security reasons
 
1230
            # (for use in a CGI context that we do not care
 
1231
            # about. Should we ?)
 
1232
            raise tests.TestNotApplicable(
 
1233
                'pycurl does not check HTTP_PROXY for security reasons')
 
1234
        self.proxied_in_env({'HTTP_PROXY': self.proxy_url})
 
1235
 
 
1236
    def test_all_proxy(self):
 
1237
        self.proxied_in_env({'all_proxy': self.proxy_url})
 
1238
 
 
1239
    def test_ALL_PROXY(self):
 
1240
        self.proxied_in_env({'ALL_PROXY': self.proxy_url})
 
1241
 
 
1242
    def test_http_proxy_with_no_proxy(self):
 
1243
        self.not_proxied_in_env({'http_proxy': self.proxy_url,
 
1244
                                 'no_proxy': self.no_proxy_host})
 
1245
 
 
1246
    def test_HTTP_PROXY_with_NO_PROXY(self):
 
1247
        if self._testing_pycurl():
 
1248
            raise tests.TestNotApplicable(
 
1249
                'pycurl does not check HTTP_PROXY for security reasons')
 
1250
        self.not_proxied_in_env({'HTTP_PROXY': self.proxy_url,
 
1251
                                 'NO_PROXY': self.no_proxy_host})
 
1252
 
 
1253
    def test_all_proxy_with_no_proxy(self):
 
1254
        self.not_proxied_in_env({'all_proxy': self.proxy_url,
 
1255
                                 'no_proxy': self.no_proxy_host})
 
1256
 
 
1257
    def test_ALL_PROXY_with_NO_PROXY(self):
 
1258
        self.not_proxied_in_env({'ALL_PROXY': self.proxy_url,
 
1259
                                 'NO_PROXY': self.no_proxy_host})
 
1260
 
 
1261
    def test_http_proxy_without_scheme(self):
 
1262
        if self._testing_pycurl():
 
1263
            # pycurl *ignores* invalid proxy env variables. If that ever change
 
1264
            # in the future, this test will fail indicating that pycurl do not
 
1265
            # ignore anymore such variables.
 
1266
            self.not_proxied_in_env({'http_proxy': self.proxy_address})
 
1267
        else:
 
1268
            self.assertRaises(errors.InvalidURL,
 
1269
                              self.proxied_in_env,
 
1270
                              {'http_proxy': self.proxy_address})
 
1271
 
 
1272
 
 
1273
class TestRanges(http_utils.TestCaseWithWebserver):
 
1274
    """Test the Range header in GET methods."""
 
1275
 
 
1276
    def setUp(self):
 
1277
        http_utils.TestCaseWithWebserver.setUp(self)
 
1278
        self.build_tree_contents([('a', '0123456789')],)
 
1279
        server = self.get_readonly_server()
 
1280
        self.transport = self._transport(server.get_url())
 
1281
 
 
1282
    def create_transport_readonly_server(self):
 
1283
        return http_server.HttpServer(protocol_version=self._protocol_version)
 
1284
 
 
1285
    def _file_contents(self, relpath, ranges):
 
1286
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
1287
        coalesce = self.transport._coalesce_offsets
 
1288
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
 
1289
        code, data = self.transport._get(relpath, coalesced)
 
1290
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1291
        for start, end in ranges:
 
1292
            data.seek(start)
 
1293
            yield data.read(end - start + 1)
 
1294
 
 
1295
    def _file_tail(self, relpath, tail_amount):
 
1296
        code, data = self.transport._get(relpath, [], tail_amount)
 
1297
        self.assertTrue(code in (200, 206),'_get returns: %d' % code)
 
1298
        data.seek(-tail_amount, 2)
 
1299
        return data.read(tail_amount)
 
1300
 
 
1301
    def test_range_header(self):
 
1302
        # Valid ranges
 
1303
        map(self.assertEqual,['0', '234'],
 
1304
            list(self._file_contents('a', [(0,0), (2,4)])),)
 
1305
 
 
1306
    def test_range_header_tail(self):
 
1307
        self.assertEqual('789', self._file_tail('a', 3))
 
1308
 
 
1309
    def test_syntactically_invalid_range_header(self):
 
1310
        self.assertListRaises(errors.InvalidHttpRange,
 
1311
                          self._file_contents, 'a', [(4, 3)])
 
1312
 
 
1313
    def test_semantically_invalid_range_header(self):
 
1314
        self.assertListRaises(errors.InvalidHttpRange,
 
1315
                          self._file_contents, 'a', [(42, 128)])
 
1316
 
 
1317
 
 
1318
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1319
    """Test redirection between http servers."""
 
1320
 
 
1321
    def create_transport_secondary_server(self):
 
1322
        """Create the secondary server redirecting to the primary server"""
 
1323
        new = self.get_readonly_server()
 
1324
 
 
1325
        redirecting = http_utils.HTTPServerRedirecting(
 
1326
            protocol_version=self._protocol_version)
 
1327
        redirecting.redirect_to(new.host, new.port)
 
1328
        return redirecting
 
1329
 
 
1330
    def setUp(self):
 
1331
        super(TestHTTPRedirections, self).setUp()
 
1332
        self.build_tree_contents([('a', '0123456789'),
 
1333
                                  ('bundle',
 
1334
                                  '# Bazaar revision bundle v0.9\n#\n')
 
1335
                                  ],)
 
1336
        # The requests to the old server will be redirected to the new server
 
1337
        self.old_transport = self._transport(self.old_server.get_url())
 
1338
 
 
1339
    def test_redirected(self):
 
1340
        self.assertRaises(errors.RedirectRequested, self.old_transport.get, 'a')
 
1341
        t = self._transport(self.new_server.get_url())
 
1342
        self.assertEqual('0123456789', t.get('a').read())
 
1343
 
 
1344
    def test_read_redirected_bundle_from_url(self):
 
1345
        from bzrlib.bundle import read_bundle_from_url
 
1346
        url = self.old_transport.abspath('bundle')
 
1347
        bundle = self.applyDeprecated(deprecated_in((1, 12, 0)),
 
1348
                read_bundle_from_url, url)
 
1349
        # If read_bundle_from_url was successful we get an empty bundle
 
1350
        self.assertEqual([], bundle.revisions)
 
1351
 
 
1352
 
 
1353
class RedirectedRequest(_urllib2_wrappers.Request):
 
1354
    """Request following redirections. """
 
1355
 
 
1356
    init_orig = _urllib2_wrappers.Request.__init__
 
1357
 
 
1358
    def __init__(self, method, url, *args, **kwargs):
 
1359
        """Constructor.
 
1360
 
 
1361
        """
 
1362
        # Since the tests using this class will replace
 
1363
        # _urllib2_wrappers.Request, we can't just call the base class __init__
 
1364
        # or we'll loop.
 
1365
        RedirectedRequest.init_orig(self, method, url, *args, **kwargs)
 
1366
        self.follow_redirections = True
 
1367
 
 
1368
 
 
1369
class TestHTTPSilentRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1370
    """Test redirections.
 
1371
 
 
1372
    http implementations do not redirect silently anymore (they
 
1373
    do not redirect at all in fact). The mechanism is still in
 
1374
    place at the _urllib2_wrappers.Request level and these tests
 
1375
    exercise it.
 
1376
 
 
1377
    For the pycurl implementation
 
1378
    the redirection have been deleted as we may deprecate pycurl
 
1379
    and I have no place to keep a working implementation.
 
1380
    -- vila 20070212
 
1381
    """
 
1382
 
 
1383
    def setUp(self):
 
1384
        if pycurl_present and self._transport == PyCurlTransport:
 
1385
            raise tests.TestNotApplicable(
 
1386
                "pycurl doesn't redirect silently annymore")
 
1387
        super(TestHTTPSilentRedirections, self).setUp()
 
1388
        self.setup_redirected_request()
 
1389
        self.addCleanup(self.cleanup_redirected_request)
 
1390
        self.build_tree_contents([('a','a'),
 
1391
                                  ('1/',),
 
1392
                                  ('1/a', 'redirected once'),
 
1393
                                  ('2/',),
 
1394
                                  ('2/a', 'redirected twice'),
 
1395
                                  ('3/',),
 
1396
                                  ('3/a', 'redirected thrice'),
 
1397
                                  ('4/',),
 
1398
                                  ('4/a', 'redirected 4 times'),
 
1399
                                  ('5/',),
 
1400
                                  ('5/a', 'redirected 5 times'),
 
1401
                                  ],)
 
1402
 
 
1403
        self.old_transport = self._transport(self.old_server.get_url())
 
1404
 
 
1405
    def setup_redirected_request(self):
 
1406
        self.original_class = _urllib2_wrappers.Request
 
1407
        _urllib2_wrappers.Request = RedirectedRequest
 
1408
 
 
1409
    def cleanup_redirected_request(self):
 
1410
        _urllib2_wrappers.Request = self.original_class
 
1411
 
 
1412
    def create_transport_secondary_server(self):
 
1413
        """Create the secondary server, redirections are defined in the tests"""
 
1414
        return http_utils.HTTPServerRedirecting(
 
1415
            protocol_version=self._protocol_version)
 
1416
 
 
1417
    def test_one_redirection(self):
 
1418
        t = self.old_transport
 
1419
 
 
1420
        req = RedirectedRequest('GET', t.abspath('a'))
 
1421
        req.follow_redirections = True
 
1422
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1423
                                       self.new_server.port)
 
1424
        self.old_server.redirections = \
 
1425
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
1426
        self.assertEquals('redirected once',t._perform(req).read())
 
1427
 
 
1428
    def test_five_redirections(self):
 
1429
        t = self.old_transport
 
1430
 
 
1431
        req = RedirectedRequest('GET', t.abspath('a'))
 
1432
        req.follow_redirections = True
 
1433
        old_prefix = 'http://%s:%s' % (self.old_server.host,
 
1434
                                       self.old_server.port)
 
1435
        new_prefix = 'http://%s:%s' % (self.new_server.host,
 
1436
                                       self.new_server.port)
 
1437
        self.old_server.redirections = [
 
1438
            ('/1(.*)', r'%s/2\1' % (old_prefix), 302),
 
1439
            ('/2(.*)', r'%s/3\1' % (old_prefix), 303),
 
1440
            ('/3(.*)', r'%s/4\1' % (old_prefix), 307),
 
1441
            ('/4(.*)', r'%s/5\1' % (new_prefix), 301),
 
1442
            ('(/[^/]+)', r'%s/1\1' % (old_prefix), 301),
 
1443
            ]
 
1444
        self.assertEquals('redirected 5 times',t._perform(req).read())
 
1445
 
 
1446
 
 
1447
class TestDoCatchRedirections(http_utils.TestCaseWithRedirectedWebserver):
 
1448
    """Test transport.do_catching_redirections."""
 
1449
 
 
1450
    def setUp(self):
 
1451
        super(TestDoCatchRedirections, self).setUp()
 
1452
        self.build_tree_contents([('a', '0123456789'),],)
 
1453
 
 
1454
        self.old_transport = self._transport(self.old_server.get_url())
 
1455
 
 
1456
    def get_a(self, transport):
 
1457
        return transport.get('a')
 
1458
 
 
1459
    def test_no_redirection(self):
 
1460
        t = self._transport(self.new_server.get_url())
 
1461
 
 
1462
        # We use None for redirected so that we fail if redirected
 
1463
        self.assertEquals('0123456789',
 
1464
                          transport.do_catching_redirections(
 
1465
                self.get_a, t, None).read())
 
1466
 
 
1467
    def test_one_redirection(self):
 
1468
        self.redirections = 0
 
1469
 
 
1470
        def redirected(transport, exception, redirection_notice):
 
1471
            self.redirections += 1
 
1472
            dir, file = urlutils.split(exception.target)
 
1473
            return self._transport(dir)
 
1474
 
 
1475
        self.assertEquals('0123456789',
 
1476
                          transport.do_catching_redirections(
 
1477
                self.get_a, self.old_transport, redirected).read())
 
1478
        self.assertEquals(1, self.redirections)
 
1479
 
 
1480
    def test_redirection_loop(self):
 
1481
 
 
1482
        def redirected(transport, exception, redirection_notice):
 
1483
            # By using the redirected url as a base dir for the
 
1484
            # *old* transport, we create a loop: a => a/a =>
 
1485
            # a/a/a
 
1486
            return self.old_transport.clone(exception.target)
 
1487
 
 
1488
        self.assertRaises(errors.TooManyRedirections,
 
1489
                          transport.do_catching_redirections,
 
1490
                          self.get_a, self.old_transport, redirected)
 
1491
 
 
1492
 
 
1493
class TestAuth(http_utils.TestCaseWithWebserver):
 
1494
    """Test authentication scheme"""
 
1495
 
 
1496
    _auth_header = 'Authorization'
 
1497
    _password_prompt_prefix = ''
 
1498
    _username_prompt_prefix = ''
 
1499
    # Set by load_tests
 
1500
    _auth_server = None
 
1501
 
 
1502
    def setUp(self):
 
1503
        super(TestAuth, self).setUp()
 
1504
        self.server = self.get_readonly_server()
 
1505
        self.build_tree_contents([('a', 'contents of a\n'),
 
1506
                                  ('b', 'contents of b\n'),])
 
1507
 
 
1508
    def create_transport_readonly_server(self):
 
1509
        return self._auth_server(protocol_version=self._protocol_version)
 
1510
 
 
1511
    def _testing_pycurl(self):
 
1512
        return pycurl_present and self._transport == PyCurlTransport
 
1513
 
 
1514
    def get_user_url(self, user, password):
 
1515
        """Build an url embedding user and password"""
 
1516
        url = '%s://' % self.server._url_protocol
 
1517
        if user is not None:
 
1518
            url += user
 
1519
            if password is not None:
 
1520
                url += ':' + password
 
1521
            url += '@'
 
1522
        url += '%s:%s/' % (self.server.host, self.server.port)
 
1523
        return url
 
1524
 
 
1525
    def get_user_transport(self, user, password):
 
1526
        return self._transport(self.get_user_url(user, password))
 
1527
 
 
1528
    def test_no_user(self):
 
1529
        self.server.add_user('joe', 'foo')
 
1530
        t = self.get_user_transport(None, None)
 
1531
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1532
        # Only one 'Authentication Required' error should occur
 
1533
        self.assertEqual(1, self.server.auth_required_errors)
 
1534
 
 
1535
    def test_empty_pass(self):
 
1536
        self.server.add_user('joe', '')
 
1537
        t = self.get_user_transport('joe', '')
 
1538
        self.assertEqual('contents of a\n', t.get('a').read())
 
1539
        # Only one 'Authentication Required' error should occur
 
1540
        self.assertEqual(1, self.server.auth_required_errors)
 
1541
 
 
1542
    def test_user_pass(self):
 
1543
        self.server.add_user('joe', 'foo')
 
1544
        t = self.get_user_transport('joe', 'foo')
 
1545
        self.assertEqual('contents of a\n', t.get('a').read())
 
1546
        # Only one 'Authentication Required' error should occur
 
1547
        self.assertEqual(1, self.server.auth_required_errors)
 
1548
 
 
1549
    def test_unknown_user(self):
 
1550
        self.server.add_user('joe', 'foo')
 
1551
        t = self.get_user_transport('bill', 'foo')
 
1552
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1553
        # Two 'Authentication Required' errors should occur (the
 
1554
        # initial 'who are you' and 'I don't know you, who are
 
1555
        # you').
 
1556
        self.assertEqual(2, self.server.auth_required_errors)
 
1557
 
 
1558
    def test_wrong_pass(self):
 
1559
        self.server.add_user('joe', 'foo')
 
1560
        t = self.get_user_transport('joe', 'bar')
 
1561
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'a')
 
1562
        # Two 'Authentication Required' errors should occur (the
 
1563
        # initial 'who are you' and 'this is not you, who are you')
 
1564
        self.assertEqual(2, self.server.auth_required_errors)
 
1565
 
 
1566
    def test_prompt_for_username(self):
 
1567
        if self._testing_pycurl():
 
1568
            raise tests.TestNotApplicable(
 
1569
                'pycurl cannot prompt, it handles auth by embedding'
 
1570
                ' user:pass in urls only')
 
1571
 
 
1572
        self.server.add_user('joe', 'foo')
 
1573
        t = self.get_user_transport(None, None)
 
1574
        stdout = tests.StringIOWrapper()
 
1575
        stderr = tests.StringIOWrapper()
 
1576
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n',
 
1577
                                            stdout=stdout, stderr=stderr)
 
1578
        self.assertEqual('contents of a\n',t.get('a').read())
 
1579
        # stdin should be empty
 
1580
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1581
        stderr.seek(0)
 
1582
        expected_prompt = self._expected_username_prompt(t._unqualified_scheme)
 
1583
        self.assertEquals(expected_prompt, stderr.read(len(expected_prompt)))
 
1584
        self.assertEquals('', stdout.getvalue())
 
1585
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1586
                                    stderr.readline())
 
1587
 
 
1588
    def test_prompt_for_password(self):
 
1589
        if self._testing_pycurl():
 
1590
            raise tests.TestNotApplicable(
 
1591
                'pycurl cannot prompt, it handles auth by embedding'
 
1592
                ' user:pass in urls only')
 
1593
 
 
1594
        self.server.add_user('joe', 'foo')
 
1595
        t = self.get_user_transport('joe', None)
 
1596
        stdout = tests.StringIOWrapper()
 
1597
        stderr = tests.StringIOWrapper()
 
1598
        ui.ui_factory = tests.TestUIFactory(stdin='foo\n',
 
1599
                                            stdout=stdout, stderr=stderr)
 
1600
        self.assertEqual('contents of a\n', t.get('a').read())
 
1601
        # stdin should be empty
 
1602
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
1603
        self._check_password_prompt(t._unqualified_scheme, 'joe',
 
1604
                                    stderr.getvalue())
 
1605
        self.assertEquals('', stdout.getvalue())
 
1606
        # And we shouldn't prompt again for a different request
 
1607
        # against the same transport.
 
1608
        self.assertEqual('contents of b\n',t.get('b').read())
 
1609
        t2 = t.clone()
 
1610
        # And neither against a clone
 
1611
        self.assertEqual('contents of b\n',t2.get('b').read())
 
1612
        # Only one 'Authentication Required' error should occur
 
1613
        self.assertEqual(1, self.server.auth_required_errors)
 
1614
 
 
1615
    def _check_password_prompt(self, scheme, user, actual_prompt):
 
1616
        expected_prompt = (self._password_prompt_prefix
 
1617
                           + ("%s %s@%s:%d, Realm: '%s' password: "
 
1618
                              % (scheme.upper(),
 
1619
                                 user, self.server.host, self.server.port,
 
1620
                                 self.server.auth_realm)))
 
1621
        self.assertEquals(expected_prompt, actual_prompt)
 
1622
 
 
1623
    def _expected_username_prompt(self, scheme):
 
1624
        return (self._username_prompt_prefix
 
1625
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
 
1626
                                 self.server.host, self.server.port,
 
1627
                                 self.server.auth_realm))
 
1628
 
 
1629
    def test_no_prompt_for_password_when_using_auth_config(self):
 
1630
        if self._testing_pycurl():
 
1631
            raise tests.TestNotApplicable(
 
1632
                'pycurl does not support authentication.conf'
 
1633
                ' since it cannot prompt')
 
1634
 
 
1635
        user =' joe'
 
1636
        password = 'foo'
 
1637
        stdin_content = 'bar\n'  # Not the right password
 
1638
        self.server.add_user(user, password)
 
1639
        t = self.get_user_transport(user, None)
 
1640
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
 
1641
                                            stdout=tests.StringIOWrapper())
 
1642
        # Create a minimal config file with the right password
 
1643
        conf = config.AuthenticationConfig()
 
1644
        conf._get_config().update(
 
1645
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1646
                          'user': user, 'password': password}})
 
1647
        conf._save()
 
1648
        # Issue a request to the server to connect
 
1649
        self.assertEqual('contents of a\n',t.get('a').read())
 
1650
        # stdin should have  been left untouched
 
1651
        self.assertEqual(stdin_content, ui.ui_factory.stdin.readline())
 
1652
        # Only one 'Authentication Required' error should occur
 
1653
        self.assertEqual(1, self.server.auth_required_errors)
 
1654
 
 
1655
    def test_user_from_auth_conf(self):
 
1656
        if self._testing_pycurl():
 
1657
            raise tests.TestNotApplicable(
 
1658
                'pycurl does not support authentication.conf')
 
1659
        user = 'joe'
 
1660
        password = 'foo'
 
1661
        self.server.add_user(user, password)
 
1662
        # Create a minimal config file with the right password
 
1663
        conf = config.AuthenticationConfig()
 
1664
        conf._get_config().update(
 
1665
            {'httptest': {'scheme': 'http', 'port': self.server.port,
 
1666
                          'user': user, 'password': password}})
 
1667
        conf._save()
 
1668
        t = self.get_user_transport(None, None)
 
1669
        # Issue a request to the server to connect
 
1670
        self.assertEqual('contents of a\n', t.get('a').read())
 
1671
        # Only one 'Authentication Required' error should occur
 
1672
        self.assertEqual(1, self.server.auth_required_errors)
 
1673
 
 
1674
    def test_changing_nonce(self):
 
1675
        if self._auth_server not in (http_utils.HTTPDigestAuthServer,
 
1676
                                     http_utils.ProxyDigestAuthServer):
 
1677
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
 
1678
        if self._testing_pycurl():
 
1679
            raise tests.KnownFailure(
 
1680
                'pycurl does not handle a nonce change')
 
1681
        self.server.add_user('joe', 'foo')
 
1682
        t = self.get_user_transport('joe', 'foo')
 
1683
        self.assertEqual('contents of a\n', t.get('a').read())
 
1684
        self.assertEqual('contents of b\n', t.get('b').read())
 
1685
        # Only one 'Authentication Required' error should have
 
1686
        # occured so far
 
1687
        self.assertEqual(1, self.server.auth_required_errors)
 
1688
        # The server invalidates the current nonce
 
1689
        self.server.auth_nonce = self.server.auth_nonce + '. No, now!'
 
1690
        self.assertEqual('contents of a\n', t.get('a').read())
 
1691
        # Two 'Authentication Required' errors should occur (the
 
1692
        # initial 'who are you' and a second 'who are you' with the new nonce)
 
1693
        self.assertEqual(2, self.server.auth_required_errors)
 
1694
 
 
1695
 
 
1696
 
 
1697
class TestProxyAuth(TestAuth):
 
1698
    """Test proxy authentication schemes."""
 
1699
 
 
1700
    _auth_header = 'Proxy-authorization'
 
1701
    _password_prompt_prefix = 'Proxy '
 
1702
    _username_prompt_prefix = 'Proxy '
 
1703
 
 
1704
    def setUp(self):
 
1705
        super(TestProxyAuth, self).setUp()
 
1706
        self._old_env = {}
 
1707
        self.addCleanup(self._restore_env)
 
1708
        # Override the contents to avoid false positives
 
1709
        self.build_tree_contents([('a', 'not proxied contents of a\n'),
 
1710
                                  ('b', 'not proxied contents of b\n'),
 
1711
                                  ('a-proxied', 'contents of a\n'),
 
1712
                                  ('b-proxied', 'contents of b\n'),
 
1713
                                  ])
 
1714
 
 
1715
    def get_user_transport(self, user, password):
 
1716
        self._install_env({'all_proxy': self.get_user_url(user, password)})
 
1717
        return self._transport(self.server.get_url())
 
1718
 
 
1719
    def _install_env(self, env):
 
1720
        for name, value in env.iteritems():
 
1721
            self._old_env[name] = osutils.set_or_unset_env(name, value)
 
1722
 
 
1723
    def _restore_env(self):
 
1724
        for name, value in self._old_env.iteritems():
 
1725
            osutils.set_or_unset_env(name, value)
 
1726
 
 
1727
    def test_empty_pass(self):
 
1728
        if self._testing_pycurl():
 
1729
            import pycurl
 
1730
            if pycurl.version_info()[1] < '7.16.0':
 
1731
                raise tests.KnownFailure(
 
1732
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
 
1733
        super(TestProxyAuth, self).test_empty_pass()
 
1734
 
 
1735
 
 
1736
class SampleSocket(object):
 
1737
    """A socket-like object for use in testing the HTTP request handler."""
 
1738
 
 
1739
    def __init__(self, socket_read_content):
 
1740
        """Constructs a sample socket.
 
1741
 
 
1742
        :param socket_read_content: a byte sequence
 
1743
        """
 
1744
        # Use plain python StringIO so we can monkey-patch the close method to
 
1745
        # not discard the contents.
 
1746
        from StringIO import StringIO
 
1747
        self.readfile = StringIO(socket_read_content)
 
1748
        self.writefile = StringIO()
 
1749
        self.writefile.close = lambda: None
 
1750
 
 
1751
    def makefile(self, mode='r', bufsize=None):
 
1752
        if 'r' in mode:
 
1753
            return self.readfile
 
1754
        else:
 
1755
            return self.writefile
 
1756
 
 
1757
 
 
1758
class SmartHTTPTunnellingTest(tests.TestCaseWithTransport):
 
1759
 
 
1760
    def setUp(self):
 
1761
        super(SmartHTTPTunnellingTest, self).setUp()
 
1762
        # We use the VFS layer as part of HTTP tunnelling tests.
 
1763
        self._captureVar('BZR_NO_SMART_VFS', None)
 
1764
        self.transport_readonly_server = http_utils.HTTPServerWithSmarts
 
1765
 
 
1766
    def create_transport_readonly_server(self):
 
1767
        return http_utils.HTTPServerWithSmarts(
 
1768
            protocol_version=self._protocol_version)
 
1769
 
 
1770
    def test_open_bzrdir(self):
 
1771
        branch = self.make_branch('relpath')
 
1772
        http_server = self.get_readonly_server()
 
1773
        url = http_server.get_url() + 'relpath'
 
1774
        bd = bzrdir.BzrDir.open(url)
 
1775
        self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
 
1776
 
 
1777
    def test_bulk_data(self):
 
1778
        # We should be able to send and receive bulk data in a single message.
 
1779
        # The 'readv' command in the smart protocol both sends and receives
 
1780
        # bulk data, so we use that.
 
1781
        self.build_tree(['data-file'])
 
1782
        http_server = self.get_readonly_server()
 
1783
        http_transport = self._transport(http_server.get_url())
 
1784
        medium = http_transport.get_smart_medium()
 
1785
        # Since we provide the medium, the url below will be mostly ignored
 
1786
        # during the test, as long as the path is '/'.
 
1787
        remote_transport = remote.RemoteTransport('bzr://fake_host/',
 
1788
                                                  medium=medium)
 
1789
        self.assertEqual(
 
1790
            [(0, "c")], list(remote_transport.readv("data-file", [(0,1)])))
 
1791
 
 
1792
    def test_http_send_smart_request(self):
 
1793
 
 
1794
        post_body = 'hello\n'
 
1795
        expected_reply_body = 'ok\x012\n'
 
1796
 
 
1797
        http_server = self.get_readonly_server()
 
1798
        http_transport = self._transport(http_server.get_url())
 
1799
        medium = http_transport.get_smart_medium()
 
1800
        response = medium.send_http_smart_request(post_body)
 
1801
        reply_body = response.read()
 
1802
        self.assertEqual(expected_reply_body, reply_body)
 
1803
 
 
1804
    def test_smart_http_server_post_request_handler(self):
 
1805
        httpd = self.get_readonly_server()._get_httpd()
 
1806
 
 
1807
        socket = SampleSocket(
 
1808
            'POST /.bzr/smart %s \r\n' % self._protocol_version
 
1809
            # HTTP/1.1 posts must have a Content-Length (but it doesn't hurt
 
1810
            # for 1.0)
 
1811
            + 'Content-Length: 6\r\n'
 
1812
            '\r\n'
 
1813
            'hello\n')
 
1814
        # Beware: the ('localhost', 80) below is the
 
1815
        # client_address parameter, but we don't have one because
 
1816
        # we have defined a socket which is not bound to an
 
1817
        # address. The test framework never uses this client
 
1818
        # address, so far...
 
1819
        request_handler = http_utils.SmartRequestHandler(socket,
 
1820
                                                         ('localhost', 80),
 
1821
                                                         httpd)
 
1822
        response = socket.writefile.getvalue()
 
1823
        self.assertStartsWith(response, '%s 200 ' % self._protocol_version)
 
1824
        # This includes the end of the HTTP headers, and all the body.
 
1825
        expected_end_of_response = '\r\n\r\nok\x012\n'
 
1826
        self.assertEndsWith(response, expected_end_of_response)
 
1827
 
 
1828
 
 
1829
class ForbiddenRequestHandler(http_server.TestingHTTPRequestHandler):
 
1830
    """No smart server here request handler."""
 
1831
 
 
1832
    def do_POST(self):
 
1833
        self.send_error(403, "Forbidden")
 
1834
 
 
1835
 
 
1836
class SmartClientAgainstNotSmartServer(TestSpecificRequestHandler):
 
1837
    """Test smart client behaviour against an http server without smarts."""
 
1838
 
 
1839
    _req_handler_class = ForbiddenRequestHandler
 
1840
 
 
1841
    def test_probe_smart_server(self):
 
1842
        """Test error handling against server refusing smart requests."""
 
1843
        server = self.get_readonly_server()
 
1844
        t = self._transport(server.get_url())
 
1845
        # No need to build a valid smart request here, the server will not even
 
1846
        # try to interpret it.
 
1847
        self.assertRaises(errors.SmartProtocolError,
 
1848
                          t.get_smart_medium().send_http_smart_request,
 
1849
                          'whatever')
 
1850
 
 
1851
class Test_redirected_to(tests.TestCase):
 
1852
 
 
1853
    def test_redirected_to_subdir(self):
 
1854
        t = self._transport('http://www.example.com/foo')
 
1855
        r = t._redirected_to('http://www.example.com/foo',
 
1856
                             'http://www.example.com/foo/subdir')
 
1857
        self.assertIsInstance(r, type(t))
 
1858
        # Both transports share the some connection
 
1859
        self.assertEquals(t._get_connection(), r._get_connection())
 
1860
 
 
1861
    def test_redirected_to_self_with_slash(self):
 
1862
        t = self._transport('http://www.example.com/foo')
 
1863
        r = t._redirected_to('http://www.example.com/foo',
 
1864
                             'http://www.example.com/foo/')
 
1865
        self.assertIsInstance(r, type(t))
 
1866
        # Both transports share the some connection (one can argue that we
 
1867
        # should return the exact same transport here, but that seems
 
1868
        # overkill).
 
1869
        self.assertEquals(t._get_connection(), r._get_connection())
 
1870
 
 
1871
    def test_redirected_to_host(self):
 
1872
        t = self._transport('http://www.example.com/foo')
 
1873
        r = t._redirected_to('http://www.example.com/foo',
 
1874
                             'http://foo.example.com/foo/subdir')
 
1875
        self.assertIsInstance(r, type(t))
 
1876
 
 
1877
    def test_redirected_to_same_host_sibling_protocol(self):
 
1878
        t = self._transport('http://www.example.com/foo')
 
1879
        r = t._redirected_to('http://www.example.com/foo',
 
1880
                             'https://www.example.com/foo')
 
1881
        self.assertIsInstance(r, type(t))
 
1882
 
 
1883
    def test_redirected_to_same_host_different_protocol(self):
 
1884
        t = self._transport('http://www.example.com/foo')
 
1885
        r = t._redirected_to('http://www.example.com/foo',
 
1886
                             'ftp://www.example.com/foo')
 
1887
        self.assertNotEquals(type(r), type(t))
 
1888
 
 
1889
    def test_redirected_to_different_host_same_user(self):
 
1890
        t = self._transport('http://joe@www.example.com/foo')
 
1891
        r = t._redirected_to('http://www.example.com/foo',
 
1892
                             'https://foo.example.com/foo')
 
1893
        self.assertIsInstance(r, type(t))
 
1894
        self.assertEquals(t._user, r._user)
 
1895
 
 
1896
 
 
1897
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
 
1898
    """Request handler for a unique and pre-defined request.
 
1899
 
 
1900
    The only thing we care about here is how many bytes travel on the wire. But
 
1901
    since we want to measure it for a real http client, we have to send it
 
1902
    correct responses.
 
1903
 
 
1904
    We expect to receive a *single* request nothing more (and we won't even
 
1905
    check what request it is, we just measure the bytes read until an empty
 
1906
    line.
 
1907
    """
 
1908
 
 
1909
    def handle_one_request(self):
 
1910
        tcs = self.server.test_case_server
 
1911
        requestline = self.rfile.readline()
 
1912
        headers = self.MessageClass(self.rfile, 0)
 
1913
        # We just read: the request, the headers, an empty line indicating the
 
1914
        # end of the headers.
 
1915
        bytes_read = len(requestline)
 
1916
        for line in headers.headers:
 
1917
            bytes_read += len(line)
 
1918
        bytes_read += len('\r\n')
 
1919
        if requestline.startswith('POST'):
 
1920
            # The body should be a single line (or we don't know where it ends
 
1921
            # and we don't want to issue a blocking read)
 
1922
            body = self.rfile.readline()
 
1923
            bytes_read += len(body)
 
1924
        tcs.bytes_read = bytes_read
 
1925
 
 
1926
        # We set the bytes written *before* issuing the write, the client is
 
1927
        # supposed to consume every produced byte *before* checking that value.
 
1928
 
 
1929
        # Doing the oppposite may lead to test failure: we may be interrupted
 
1930
        # after the write but before updating the value. The client can then
 
1931
        # continue and read the value *before* we can update it. And yes,
 
1932
        # this has been observed -- vila 20090129
 
1933
        tcs.bytes_written = len(tcs.canned_response)
 
1934
        self.wfile.write(tcs.canned_response)
 
1935
 
 
1936
 
 
1937
class ActivityServerMixin(object):
 
1938
 
 
1939
    def __init__(self, protocol_version):
 
1940
        super(ActivityServerMixin, self).__init__(
 
1941
            request_handler=PredefinedRequestHandler,
 
1942
            protocol_version=protocol_version)
 
1943
        # Bytes read and written by the server
 
1944
        self.bytes_read = 0
 
1945
        self.bytes_written = 0
 
1946
        self.canned_response = None
 
1947
 
 
1948
 
 
1949
class ActivityHTTPServer(ActivityServerMixin, http_server.HttpServer):
 
1950
    pass
 
1951
 
 
1952
 
 
1953
if tests.HTTPSServerFeature.available():
 
1954
    from bzrlib.tests import https_server
 
1955
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
 
1956
        pass
 
1957
 
 
1958
 
 
1959
class TestActivity(tests.TestCase):
 
1960
    """Test socket activity reporting.
 
1961
 
 
1962
    We use a special purpose server to control the bytes sent and received and
 
1963
    be able to predict the activity on the client socket.
 
1964
    """
 
1965
 
 
1966
    def setUp(self):
 
1967
        tests.TestCase.setUp(self)
 
1968
        self.server = self._activity_server(self._protocol_version)
 
1969
        self.server.setUp()
 
1970
        self.activities = {}
 
1971
        def report_activity(t, bytes, direction):
 
1972
            count = self.activities.get(direction, 0)
 
1973
            count += bytes
 
1974
            self.activities[direction] = count
 
1975
 
 
1976
        # We override at class level because constructors may propagate the
 
1977
        # bound method and render instance overriding ineffective (an
 
1978
        # alternative would be to define a specific ui factory instead...)
 
1979
        self.orig_report_activity = self._transport._report_activity
 
1980
        self._transport._report_activity = report_activity
 
1981
 
 
1982
    def tearDown(self):
 
1983
        self._transport._report_activity = self.orig_report_activity
 
1984
        self.server.tearDown()
 
1985
        tests.TestCase.tearDown(self)
 
1986
 
 
1987
    def get_transport(self):
 
1988
        return self._transport(self.server.get_url())
 
1989
 
 
1990
    def assertActivitiesMatch(self):
 
1991
        self.assertEqual(self.server.bytes_read,
 
1992
                         self.activities.get('write', 0), 'written bytes')
 
1993
        self.assertEqual(self.server.bytes_written,
 
1994
                         self.activities.get('read', 0), 'read bytes')
 
1995
 
 
1996
    def test_get(self):
 
1997
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
1998
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
 
1999
Server: Apache/2.0.54 (Fedora)\r
 
2000
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
 
2001
ETag: "56691-23-38e9ae00"\r
 
2002
Accept-Ranges: bytes\r
 
2003
Content-Length: 35\r
 
2004
Connection: close\r
 
2005
Content-Type: text/plain; charset=UTF-8\r
 
2006
\r
 
2007
Bazaar-NG meta directory, format 1
 
2008
'''
 
2009
        t = self.get_transport()
 
2010
        self.assertEqual('Bazaar-NG meta directory, format 1\n',
 
2011
                         t.get('foo/bar').read())
 
2012
        self.assertActivitiesMatch()
 
2013
 
 
2014
    def test_has(self):
 
2015
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
2016
Server: SimpleHTTP/0.6 Python/2.5.2\r
 
2017
Date: Thu, 29 Jan 2009 20:21:47 GMT\r
 
2018
Content-type: application/octet-stream\r
 
2019
Content-Length: 20\r
 
2020
Last-Modified: Thu, 29 Jan 2009 20:21:47 GMT\r
 
2021
\r
 
2022
'''
 
2023
        t = self.get_transport()
 
2024
        self.assertTrue(t.has('foo/bar'))
 
2025
        self.assertActivitiesMatch()
 
2026
 
 
2027
    def test_readv(self):
 
2028
        self.server.canned_response = '''HTTP/1.1 206 Partial Content\r
 
2029
Date: Tue, 11 Jul 2006 04:49:48 GMT\r
 
2030
Server: Apache/2.0.54 (Fedora)\r
 
2031
Last-Modified: Thu, 06 Jul 2006 20:22:05 GMT\r
 
2032
ETag: "238a3c-16ec2-805c5540"\r
 
2033
Accept-Ranges: bytes\r
 
2034
Content-Length: 1534\r
 
2035
Connection: close\r
 
2036
Content-Type: multipart/byteranges; boundary=418470f848b63279b\r
 
2037
\r
 
2038
\r
 
2039
--418470f848b63279b\r
 
2040
Content-type: text/plain; charset=UTF-8\r
 
2041
Content-range: bytes 0-254/93890\r
 
2042
\r
 
2043
mbp@sourcefrog.net-20050309040815-13242001617e4a06
 
2044
mbp@sourcefrog.net-20050309040929-eee0eb3e6d1e7627
 
2045
mbp@sourcefrog.net-20050309040957-6cad07f466bb0bb8
 
2046
mbp@sourcefrog.net-20050309041501-c840e09071de3b67
 
2047
mbp@sourcefrog.net-20050309044615-c24a3250be83220a
 
2048
\r
 
2049
--418470f848b63279b\r
 
2050
Content-type: text/plain; charset=UTF-8\r
 
2051
Content-range: bytes 1000-2049/93890\r
 
2052
\r
 
2053
40-fd4ec249b6b139ab
 
2054
mbp@sourcefrog.net-20050311063625-07858525021f270b
 
2055
mbp@sourcefrog.net-20050311231934-aa3776aff5200bb9
 
2056
mbp@sourcefrog.net-20050311231953-73aeb3a131c3699a
 
2057
mbp@sourcefrog.net-20050311232353-f5e33da490872c6a
 
2058
mbp@sourcefrog.net-20050312071639-0a8f59a34a024ff0
 
2059
mbp@sourcefrog.net-20050312073432-b2c16a55e0d6e9fb
 
2060
mbp@sourcefrog.net-20050312073831-a47c3335ece1920f
 
2061
mbp@sourcefrog.net-20050312085412-13373aa129ccbad3
 
2062
mbp@sourcefrog.net-20050313052251-2bf004cb96b39933
 
2063
mbp@sourcefrog.net-20050313052856-3edd84094687cb11
 
2064
mbp@sourcefrog.net-20050313053233-e30a4f28aef48f9d
 
2065
mbp@sourcefrog.net-20050313053853-7c64085594ff3072
 
2066
mbp@sourcefrog.net-20050313054757-a86c3f5871069e22
 
2067
mbp@sourcefrog.net-20050313061422-418f1f73b94879b9
 
2068
mbp@sourcefrog.net-20050313120651-497bd231b19df600
 
2069
mbp@sourcefrog.net-20050314024931-eae0170ef25a5d1a
 
2070
mbp@sourcefrog.net-20050314025438-d52099f915fe65fc
 
2071
mbp@sourcefrog.net-20050314025539-637a636692c055cf
 
2072
mbp@sourcefrog.net-20050314025737-55eb441f430ab4ba
 
2073
mbp@sourcefrog.net-20050314025901-d74aa93bb7ee8f62
 
2074
mbp@source\r
 
2075
--418470f848b63279b--\r
 
2076
'''
 
2077
        t = self.get_transport()
 
2078
        # Remember that the request is ignored and that the ranges below
 
2079
        # doesn't have to match the canned response.
 
2080
        l = list(t.readv('/foo/bar', ((0, 255), (1000, 1050))))
 
2081
        self.assertEqual(2, len(l))
 
2082
        self.assertActivitiesMatch()
 
2083
 
 
2084
    def test_post(self):
 
2085
        self.server.canned_response = '''HTTP/1.1 200 OK\r
 
2086
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
 
2087
Server: Apache/2.0.54 (Fedora)\r
 
2088
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
 
2089
ETag: "56691-23-38e9ae00"\r
 
2090
Accept-Ranges: bytes\r
 
2091
Content-Length: 35\r
 
2092
Connection: close\r
 
2093
Content-Type: text/plain; charset=UTF-8\r
 
2094
\r
 
2095
lalala whatever as long as itsssss
 
2096
'''
 
2097
        t = self.get_transport()
 
2098
        # We must send a single line of body bytes, see
 
2099
        # PredefinedRequestHandler.handle_one_request
 
2100
        code, f = t._post('abc def end-of-body\n')
 
2101
        self.assertEqual('lalala whatever as long as itsssss\n', f.read())
 
2102
        self.assertActivitiesMatch()