/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_http.py

  • Committer: Jelmer Vernooij
  • Date: 2018-11-17 00:47:52 UTC
  • mfrom: (7182 work)
  • mto: This revision was merged to the branch mainline in revision 7305.
  • Revision ID: jelmer@jelmer.uk-20181117004752-6ywampe5pfywlby4
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
87
87
def vary_by_http_protocol_version():
88
88
    """Test on http/1.0 and 1.1"""
89
89
    return [
90
 
        ('HTTP/1.0',  dict(_protocol_version='HTTP/1.0')),
91
 
        ('HTTP/1.1',  dict(_protocol_version='HTTP/1.1')),
 
90
        ('HTTP/1.0', dict(_protocol_version='HTTP/1.0')),
 
91
        ('HTTP/1.1', dict(_protocol_version='HTTP/1.1')),
92
92
        ]
93
93
 
94
94
 
125
125
def vary_by_http_activity():
126
126
    activity_scenarios = [
127
127
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
128
 
                            _transport=HttpTransport,)),
 
128
                             _transport=HttpTransport,)),
129
129
        ]
130
130
    if features.HTTPSServerFeature.available():
131
131
        # FIXME: Until we have a better way to handle self-signed certificates
135
135
        from . import (
136
136
            ssl_certs,
137
137
            )
 
138
 
138
139
        class HTTPS_transport(HttpTransport):
139
140
 
140
141
            def __init__(self, base, _from_transport=None):
226
227
    def parse_header(self, header, auth_handler_class=None):
227
228
        if auth_handler_class is None:
228
229
            auth_handler_class = _urllib2_wrappers.AbstractAuthHandler
229
 
        self.auth_handler =  auth_handler_class()
 
230
        self.auth_handler = auth_handler_class()
230
231
        return self.auth_handler._parse_auth_header(header)
231
232
 
232
233
    def test_empty_header(self):
275
276
    def setUp(self):
276
277
        super(TestHTTPRangeParsing, self).setUp()
277
278
        # We focus on range  parsing here and ignore everything else
 
279
 
278
280
        class RequestHandler(http_server.TestingHTTPRequestHandler):
279
281
            def setup(self): pass
 
282
 
280
283
            def handle(self): pass
 
284
 
281
285
            def finish(self): pass
282
286
 
283
287
        self.req_handler = RequestHandler(None, None, None)
284
288
 
285
289
    def assertRanges(self, ranges, header, file_size):
286
290
        self.assertEqual(ranges,
287
 
                          self.req_handler._parse_ranges(header, file_size))
 
291
                         self.req_handler._parse_ranges(header, file_size))
288
292
 
289
293
    def test_simple_range(self):
290
294
        self.assertRanges([(0, 2)], 'bytes=0-2', 12)
428
432
        self.assertEqual(t.has('foo/bar'), True)
429
433
        self.assertEqual(len(server.logs), 1)
430
434
        self.assertContainsRe(server.logs[0],
431
 
            r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "Breezy/')
 
435
                              r'"HEAD /foo/bar HTTP/1.." (200|302) - "-" "Breezy/')
432
436
 
433
437
    def test_http_has_not_found(self):
434
438
        server = self.get_readonly_server()
435
439
        t = self.get_readonly_transport()
436
440
        self.assertEqual(t.has('not-found'), False)
437
441
        self.assertContainsRe(server.logs[1],
438
 
            r'"HEAD /not-found HTTP/1.." 404 - "-" "Breezy/')
 
442
                              r'"HEAD /not-found HTTP/1.." 404 - "-" "Breezy/')
439
443
 
440
444
    def test_http_get(self):
441
445
        server = self.get_readonly_server()
493
497
        code, response = http_transport._post(b'abc def end-of-body')
494
498
        self.assertTrue(
495
499
            server.received_bytes.startswith(b'POST /.bzr/smart HTTP/1.'))
496
 
        self.assertTrue(b'content-length: 19\r' in server.received_bytes.lower())
 
500
        self.assertTrue(
 
501
            b'content-length: 19\r' in server.received_bytes.lower())
497
502
        self.assertTrue(b'content-type: application/octet-stream\r'
498
503
                        in server.received_bytes.lower())
499
504
        # The transport should not be assuming that the server can accept
507
512
    """Test range_header method"""
508
513
 
509
514
    def check_header(self, value, ranges=[], tail=0):
510
 
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
515
        offsets = [(start, end - start + 1) for start, end in ranges]
511
516
        coalesce = transport.Transport._coalesce_offsets
512
517
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
513
518
        range_header = http.HttpTransport._range_header
877
882
 
878
883
 
879
884
class MultipleRangeWithoutContentLengthRequestHandler(
880
 
    http_server.TestingHTTPRequestHandler):
 
885
        http_server.TestingHTTPRequestHandler):
881
886
    """Reply to multiple range requests without content length header."""
882
887
 
883
888
    def get_multiple_ranges(self, file, file_size, ranges):
907
912
 
908
913
 
909
914
class TruncatedMultipleRangeRequestHandler(
910
 
    http_server.TestingHTTPRequestHandler):
 
915
        http_server.TestingHTTPRequestHandler):
911
916
    """Reply to multiple range requests truncating the last ones.
912
917
 
913
918
    This server generates responses whose Content-Length describes all the
932
937
                'Content-type', 'application/octet-stream')
933
938
            content_length += self._header_line_length(
934
939
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
935
 
            content_length += len('\r\n') # end headers
936
 
            content_length += end - start # + 1
 
940
            content_length += len('\r\n')  # end headers
 
941
            content_length += end - start  # + 1
937
942
        content_length += len(boundary_line)
938
943
        self.send_header('Content-length', content_length)
939
944
        self.end_headers()
985
990
 
986
991
 
987
992
class TruncatedBeforeBoundaryRequestHandler(
988
 
    http_server.TestingHTTPRequestHandler):
 
993
        http_server.TestingHTTPRequestHandler):
989
994
    """Truncation before a boundary, like in bug 198646"""
990
995
 
991
996
    _truncated_ranges = 1
1005
1010
                'Content-type', 'application/octet-stream')
1006
1011
            content_length += self._header_line_length(
1007
1012
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
1008
 
            content_length += len('\r\n') # end headers
1009
 
            content_length += end - start # + 1
 
1013
            content_length += len('\r\n')  # end headers
 
1014
            content_length += end - start  # + 1
1010
1015
        content_length += len(boundary_line)
1011
1016
        self.send_header('Content-length', content_length)
1012
1017
        self.end_headers()
1133
1138
    def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1134
1139
        handler = _urllib2_wrappers.ProxyHandler()
1135
1140
        self.assertEqual(expected,
1136
 
                          handler.evaluate_proxy_bypass(host, no_proxy))
 
1141
                         handler.evaluate_proxy_bypass(host, no_proxy))
1137
1142
 
1138
1143
    def test_empty_user(self):
1139
1144
        self.overrideEnv('http_proxy', 'http://bar.com')
1267
1272
 
1268
1273
    def _file_contents(self, relpath, ranges):
1269
1274
        t = self.get_readonly_transport()
1270
 
        offsets = [ (start, end - start + 1) for start, end in ranges]
 
1275
        offsets = [(start, end - start + 1) for start, end in ranges]
1271
1276
        coalesce = t._coalesce_offsets
1272
1277
        coalesced = list(coalesce(offsets, limit=0, fudge_factor=0))
1273
1278
        code, data = t._get(relpath, coalesced)
1293
1298
 
1294
1299
    def test_syntactically_invalid_range_header(self):
1295
1300
        self.assertListRaises(errors.InvalidHttpRange,
1296
 
                          self._file_contents, 'a', [(4, 3)])
 
1301
                              self._file_contents, 'a', [(4, 3)])
1297
1302
 
1298
1303
    def test_semantically_invalid_range_header(self):
1299
1304
        self.assertListRaises(errors.InvalidHttpRange,
1300
 
                          self._file_contents, 'a', [(42, 128)])
 
1305
                              self._file_contents, 'a', [(42, 128)])
1301
1306
 
1302
1307
 
1303
1308
class TestHTTPRedirections(http_utils.TestCaseWithRedirectedWebserver):
1312
1317
        super(TestHTTPRedirections, self).setUp()
1313
1318
        self.build_tree_contents([('a', b'0123456789'),
1314
1319
                                  ('bundle',
1315
 
                                  b'# Bazaar revision bundle v0.9\n#\n')
 
1320
                                   b'# Bazaar revision bundle v0.9\n#\n')
1316
1321
                                  ],)
1317
1322
 
1318
1323
    def test_redirected(self):
1319
1324
        self.assertRaises(errors.RedirectRequested,
1320
1325
                          self.get_old_transport().get, 'a')
1321
1326
        self.assertEqual(
1322
 
                b'0123456789',
1323
 
                self.get_new_transport().get('a').read())
 
1327
            b'0123456789',
 
1328
            self.get_new_transport().get('a').read())
1324
1329
 
1325
1330
 
1326
1331
class RedirectedRequest(_urllib2_wrappers.Request):
1352
1357
            sock.close()
1353
1358
        except socket.error:
1354
1359
            pass
 
1360
 
1355
1361
    def connect(connection):
1356
1362
        test.http_connect_orig(connection)
1357
1363
        test.addCleanup(socket_disconnect, connection.sock)
1358
1364
    test.http_connect_orig = test.overrideAttr(
1359
1365
        _urllib2_wrappers.HTTPConnection, 'connect', connect)
 
1366
 
1360
1367
    def connect(connection):
1361
1368
        test.https_connect_orig(connection)
1362
1369
        test.addCleanup(socket_disconnect, connection.sock)
1401
1408
        new_prefix = 'http://%s:%s' % (self.new_server.host,
1402
1409
                                       self.new_server.port)
1403
1410
        self.old_server.redirections = \
1404
 
            [('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
1411
            [('(.*)', r'%s/1\1' % (new_prefix), 301), ]
1405
1412
        self.assertEqual(b'redirected once', t._perform(req).read())
1406
1413
 
1407
1414
    def test_five_redirections(self):
1431
1438
 
1432
1439
    def setUp(self):
1433
1440
        super(TestDoCatchRedirections, self).setUp()
1434
 
        self.build_tree_contents([('a', b'0123456789'),],)
 
1441
        self.build_tree_contents([('a', b'0123456789'), ],)
1435
1442
        cleanup_http_redirection_connections(self)
1436
1443
 
1437
1444
        self.old_transport = self.get_old_transport()
1445
1452
        # We use None for redirected so that we fail if redirected
1446
1453
        self.assertEqual(b'0123456789',
1447
1454
                         transport.do_catching_redirections(
1448
 
                self.get_a, t, None).read())
 
1455
                             self.get_a, t, None).read())
1449
1456
 
1450
1457
    def test_one_redirection(self):
1451
1458
        self.redirections = 0
1457
1464
 
1458
1465
        self.assertEqual(b'0123456789',
1459
1466
                         transport.do_catching_redirections(
1460
 
                self.get_a, self.old_transport, redirected).read())
 
1467
                             self.get_a, self.old_transport, redirected).read())
1461
1468
        self.assertEqual(1, self.redirections)
1462
1469
 
1463
1470
    def test_redirection_loop(self):
1515
1522
        super(TestAuth, self).setUp()
1516
1523
        self.server = self.get_readonly_server()
1517
1524
        self.build_tree_contents([('a', b'contents of a\n'),
1518
 
                                  ('b', b'contents of b\n'),])
 
1525
                                  ('b', b'contents of b\n'), ])
1519
1526
 
1520
1527
    def create_transport_readonly_server(self):
1521
1528
        server = self._auth_server(protocol_version=self._protocol_version)
1622
1629
    def _expected_username_prompt(self, scheme):
1623
1630
        return (self._username_prompt_prefix
1624
1631
                + "%s %s:%d, Realm: '%s' username: " % (scheme.upper(),
1625
 
                                 self.server.host, self.server.port,
1626
 
                                 self.server.auth_realm))
 
1632
                                                        self.server.host, self.server.port,
 
1633
                                                        self.server.auth_realm))
1627
1634
 
1628
1635
    def test_no_prompt_for_password_when_using_auth_config(self):
1629
 
        user =' joe'
 
1636
        user = ' joe'
1630
1637
        password = 'foo'
1631
1638
        stdin_content = 'bar\n'  # Not the right password
1632
1639
        self.server.add_user(user, password)
1684
1691
        t = self.get_user_transport(user, password)
1685
1692
        # Capture the debug calls to mutter
1686
1693
        self.mutters = []
 
1694
 
1687
1695
        def mutter(*args):
1688
1696
            lines = args[0] % args[1:]
1689
1697
            # Some calls output multiple lines, just split them now since we
1819
1827
        httpd = self.http_server.server
1820
1828
 
1821
1829
        socket = SampleSocket(
1822
 
            b'POST /.bzr/smart %s \r\n' % self._protocol_version.encode('ascii')
 
1830
            b'POST /.bzr/smart %s \r\n' % self._protocol_version.encode('ascii') +
1823
1831
            # HTTP/1.1 posts must have a Content-Length (but it doesn't hurt
1824
1832
            # for 1.0)
1825
 
            + b'Content-Length: 6\r\n'
 
1833
            b'Content-Length: 6\r\n'
1826
1834
            b'\r\n'
1827
1835
            b'hello\n')
1828
1836
        # Beware: the ('localhost', 80) below is the
1835
1843
                                                         httpd)
1836
1844
        response = socket.writefile.getvalue()
1837
1845
        self.assertStartsWith(
1838
 
                response,
1839
 
                b'%s 200 ' % self._protocol_version.encode('ascii'))
 
1846
            response,
 
1847
            b'%s 200 ' % self._protocol_version.encode('ascii'))
1840
1848
        # This includes the end of the HTTP headers, and all the body.
1841
1849
        expected_end_of_response = b'\r\n\r\nok\x012\n'
1842
1850
        self.assertEndsWith(response, expected_end_of_response)
1893
1901
                             'http://foo.example.com/foo/subdir')
1894
1902
        self.assertIsInstance(r, type(t))
1895
1903
        self.assertEqual('http://foo.example.com/foo/subdir/',
1896
 
            r.external_url())
 
1904
                         r.external_url())
1897
1905
 
1898
1906
    def test_redirected_to_same_host_sibling_protocol(self):
1899
1907
        t = self._transport('http://www.example.com/foo')
1901
1909
                             'https://www.example.com/foo')
1902
1910
        self.assertIsInstance(r, type(t))
1903
1911
        self.assertEqual('https://www.example.com/foo/',
1904
 
            r.external_url())
 
1912
                         r.external_url())
1905
1913
 
1906
1914
    def test_redirected_to_same_host_different_protocol(self):
1907
1915
        t = self._transport('http://www.example.com/foo')
1989
1997
 
1990
1998
if features.HTTPSServerFeature.available():
1991
1999
    from . import https_server
 
2000
 
1992
2001
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
1993
2002
        pass
1994
2003
 
2004
2013
        self.server = self._activity_server(self._protocol_version)
2005
2014
        self.server.start_server()
2006
2015
        self.addCleanup(self.server.stop_server)
2007
 
        _activities = {} # Don't close over self and create a cycle
 
2016
        _activities = {}  # Don't close over self and create a cycle
 
2017
 
2008
2018
        def report_activity(t, bytes, direction):
2009
2019
            count = _activities.get(direction, 0)
2010
2020
            count += bytes
2189
2199
        new_prefix = 'http://%s:%s' % (self.new_server.host,
2190
2200
                                       self.new_server.port)
2191
2201
        self.old_server.redirections = [
2192
 
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
2202
            ('(.*)', r'%s/1\1' % (new_prefix), 301), ]
2193
2203
        self.old_transport = self.get_old_transport()
2194
2204
        self.new_server.add_user('joe', 'foo')
2195
2205
        cleanup_http_redirection_connections(self)
2214
2224
        ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n')
2215
2225
        self.assertEqual(b'redirected once',
2216
2226
                         transport.do_catching_redirections(
2217
 
                self.get_a, self.old_transport, redirected).read())
 
2227
                             self.get_a, self.old_transport, redirected).read())
2218
2228
        self.assertEqual(1, self.redirections)
2219
2229
        # stdin should be empty
2220
2230
        self.assertEqual('', ui.ui_factory.stdin.readline())
2229
2239
        new_prefix = 'http://%s:%s' % (self.new_server.host,
2230
2240
                                       self.new_server.port)
2231
2241
        self.old_server.redirections = [
2232
 
            ('(.*)', r'%s/1\1' % (new_prefix), 301),]
 
2242
            ('(.*)', r'%s/1\1' % (new_prefix), 301), ]
2233
2243
        self.assertEqual(b'redirected once', t._perform(req).read())
2234
2244
        # stdin should be empty
2235
2245
        self.assertEqual('', ui.ui_factory.stdin.readline())
2236
2246
        # stdout should be empty, stderr will contains the prompts
2237
2247
        self.assertEqual('', ui.ui_factory.stdout.getvalue())
2238