/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_http_response.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-04 19:17:13 UTC
  • mfrom: (0.193.10 trunk)
  • mto: This revision was merged to the branch mainline in revision 6778.
  • Revision ID: jelmer@jelmer.uk-20170604191713-hau7dfsqsl035slm
Bundle the cvs plugin.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
37
37
  InvalidHttpResponse.
38
38
"""
39
39
 
40
 
from cStringIO import StringIO
41
40
import httplib
42
41
 
43
 
from bzrlib import (
 
42
from .. import (
44
43
    errors,
45
44
    tests,
46
45
    )
47
 
from bzrlib.transport.http import (
 
46
from ..sixish import (
 
47
    BytesIO,
 
48
    )
 
49
from ..transport.http import (
48
50
    response,
49
51
    _urllib2_wrappers,
50
52
    )
51
 
from bzrlib.tests.file_utils import (
 
53
from .file_utils import (
52
54
    FakeReadFile,
53
55
    )
54
56
 
57
59
    """A socket-like object that can be given a predefined content."""
58
60
 
59
61
    def __init__(self, data):
60
 
        self.readfile = StringIO(data)
 
62
        self.readfile = BytesIO(data)
61
63
 
62
64
    def makefile(self, mode='r', bufsize=None):
63
65
        return self.readfile
75
77
        pass
76
78
 
77
79
 
 
80
class TestResponseFileIter(tests.TestCase):
 
81
 
 
82
    def test_iter_empty(self):
 
83
        f = response.ResponseFile('empty', BytesIO())
 
84
        self.assertEqual([], list(f))
 
85
 
 
86
    def test_iter_many(self):
 
87
        f = response.ResponseFile('many', BytesIO(b'0\n1\nboo!\n'))
 
88
        self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
 
89
 
 
90
 
78
91
class TestHTTPConnection(tests.TestCase):
79
92
 
80
93
    def test_cleanup_pipe(self):
92
105
        # Now, get the response
93
106
        resp = conn.getresponse()
94
107
        # Read part of the response
95
 
        self.assertEquals('0123456789\n', resp.read(11))
 
108
        self.assertEqual('0123456789\n', resp.read(11))
96
109
        # Override the thresold to force the warning emission
97
110
        conn._range_warning_thresold = 6 # There are 7 bytes pending
98
111
        conn.cleanup_pipe()
110
123
 
111
124
    def test_can_read_at_first_access(self):
112
125
        """Test that the just created file can be read."""
113
 
        self.assertEquals(self.alpha, self._file.read())
 
126
        self.assertEqual(self.alpha, self._file.read())
114
127
 
115
128
    def test_seek_read(self):
116
129
        """Test seek/read inside the range."""
117
130
        f = self._file
118
131
        start = self.first_range_start
119
132
        # Before any use, tell() should be at the range start
120
 
        self.assertEquals(start, f.tell())
 
133
        self.assertEqual(start, f.tell())
121
134
        cur = start # For an overall offset assertion
122
135
        f.seek(start + 3)
123
136
        cur += 3
124
 
        self.assertEquals('def', f.read(3))
 
137
        self.assertEqual('def', f.read(3))
125
138
        cur += len('def')
126
139
        f.seek(4, 1)
127
140
        cur += 4
128
 
        self.assertEquals('klmn', f.read(4))
 
141
        self.assertEqual('klmn', f.read(4))
129
142
        cur += len('klmn')
130
143
        # read(0) in the middle of a range
131
 
        self.assertEquals('', f.read(0))
 
144
        self.assertEqual('', f.read(0))
132
145
        # seek in place
133
146
        here = f.tell()
134
147
        f.seek(0, 1)
135
 
        self.assertEquals(here, f.tell())
136
 
        self.assertEquals(cur, f.tell())
 
148
        self.assertEqual(here, f.tell())
 
149
        self.assertEqual(cur, f.tell())
137
150
 
138
151
    def test_read_zero(self):
139
152
        f = self._file
140
 
        start = self.first_range_start
141
 
        self.assertEquals('', f.read(0))
 
153
        self.assertEqual('', f.read(0))
142
154
        f.seek(10, 1)
143
 
        self.assertEquals('', f.read(0))
 
155
        self.assertEqual('', f.read(0))
144
156
 
145
157
    def test_seek_at_range_end(self):
146
158
        f = self._file
149
161
    def test_read_at_range_end(self):
150
162
        """Test read behaviour at range end."""
151
163
        f = self._file
152
 
        self.assertEquals(self.alpha, f.read())
153
 
        self.assertEquals('', f.read(0))
 
164
        self.assertEqual(self.alpha, f.read())
 
165
        self.assertEqual('', f.read(0))
154
166
        self.assertRaises(errors.InvalidRange, f.read, 1)
155
167
 
156
168
    def test_unbounded_read_after_seek(self):
157
169
        f = self._file
158
170
        f.seek(24, 1)
159
171
        # Should not cross ranges
160
 
        self.assertEquals('yz', f.read())
 
172
        self.assertEqual('yz', f.read())
161
173
 
162
174
    def test_seek_backwards(self):
163
175
        f = self._file
187
199
 
188
200
       The semantic is unclear in case of multiple ranges. Seeking from end
189
201
       exists only for the http transports, cannot be used if the file size is
190
 
       unknown and is not used in bzrlib itself. This test must be (and is)
 
202
       unknown and is not used in breezy itself. This test must be (and is)
191
203
       overridden by daughter classes.
192
204
 
193
205
       Reading from end makes sense only when a range has been requested from
197
209
       """
198
210
       f = self._file
199
211
       f.seek(-2, 2)
200
 
       self.assertEquals('yz', f.read())
 
212
       self.assertEqual('yz', f.read())
201
213
 
202
214
 
203
215
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
206
218
    def setUp(self):
207
219
        super(TestRangeFileSizeUnknown, self).setUp()
208
220
        self._file = response.RangeFile('Whole_file_size_known',
209
 
                                        StringIO(self.alpha))
 
221
                                        BytesIO(self.alpha))
210
222
        # We define no range, relying on RangeFile to provide default values
211
223
        self.first_range_start = 0 # It's the whole file
212
224
 
220
232
    def test_read_at_range_end(self):
221
233
        """Test read behaviour at range end."""
222
234
        f = self._file
223
 
        self.assertEquals(self.alpha, f.read())
224
 
        self.assertEquals('', f.read(0))
225
 
        self.assertEquals('', f.read(1))
 
235
        self.assertEqual(self.alpha, f.read())
 
236
        self.assertEqual('', f.read(0))
 
237
        self.assertEqual('', f.read(1))
226
238
 
227
239
 
228
240
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
231
243
    def setUp(self):
232
244
        super(TestRangeFileSizeKnown, self).setUp()
233
245
        self._file = response.RangeFile('Whole_file_size_known',
234
 
                                        StringIO(self.alpha))
 
246
                                        BytesIO(self.alpha))
235
247
        self._file.set_range(0, len(self.alpha))
236
248
        self.first_range_start = 0 # It's the whole file
237
249
 
242
254
    def setUp(self):
243
255
        super(TestRangeFileSingleRange, self).setUp()
244
256
        self._file = response.RangeFile('Single_range_file',
245
 
                                        StringIO(self.alpha))
 
257
                                        BytesIO(self.alpha))
246
258
        self.first_range_start = 15
247
259
        self._file.set_range(self.first_range_start, len(self.alpha))
248
260
 
291
303
        content += self._boundary_line()
292
304
 
293
305
        self._file = response.RangeFile('Multiple_ranges_file',
294
 
                                        StringIO(content))
 
306
                                        BytesIO(content))
295
307
        self.set_file_boundary()
296
308
 
297
309
    def _boundary_line(self):
337
349
 
338
350
    def test_read_all_ranges(self):
339
351
        f = self._file
340
 
        self.assertEquals(self.alpha, f.read()) # Read first range
 
352
        self.assertEqual(self.alpha, f.read()) # Read first range
341
353
        f.seek(100) # Trigger the second range recognition
342
 
        self.assertEquals(self.alpha, f.read()) # Read second range
343
 
        self.assertEquals(126, f.tell())
 
354
        self.assertEqual(self.alpha, f.read()) # Read second range
 
355
        self.assertEqual(126, f.tell())
344
356
        f.seek(126) # Start of third range which is also the current pos !
345
 
        self.assertEquals('A', f.read(1))
 
357
        self.assertEqual('A', f.read(1))
346
358
        f.seek(10, 1)
347
 
        self.assertEquals('LMN', f.read(3))
 
359
        self.assertEqual('LMN', f.read(3))
348
360
 
349
361
    def test_seek_from_end(self):
350
362
        """See TestRangeFileMixin.test_seek_from_end."""
354
366
        # behaviour.
355
367
        f = self._file
356
368
        f.seek(-2, 2)
357
 
        self.assertEquals('yz', f.read())
 
369
        self.assertEqual('yz', f.read())
358
370
        self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
359
371
 
360
372
    def test_seek_into_void(self):
371
383
 
372
384
    def test_seek_across_ranges(self):
373
385
        f = self._file
374
 
        start = self.first_range_start
375
386
        f.seek(126) # skip the two first ranges
376
 
        self.assertEquals('AB', f.read(2))
 
387
        self.assertEqual('AB', f.read(2))
377
388
 
378
389
    def test_checked_read_dont_overflow_buffers(self):
379
390
        f = self._file
380
 
        start = self.first_range_start
381
391
        # We force a very low value to exercise all code paths in _checked_read
382
392
        f._discarded_buf_size = 8
383
393
        f.seek(126) # skip the two first ranges
384
 
        self.assertEquals('AB', f.read(2))
 
394
        self.assertEqual('AB', f.read(2))
385
395
 
386
396
    def test_seek_twice_between_ranges(self):
387
397
        f = self._file
399
409
 
400
410
    def test_read_at_range_end(self):
401
411
        f = self._file
402
 
        self.assertEquals(self.alpha, f.read())
403
 
        self.assertEquals(self.alpha, f.read())
404
 
        self.assertEquals(self.alpha.upper(), f.read())
 
412
        self.assertEqual(self.alpha, f.read())
 
413
        self.assertEqual(self.alpha, f.read())
 
414
        self.assertEqual(self.alpha.upper(), f.read())
405
415
        self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
406
416
 
407
417
 
435
445
 
436
446
    def test_seek_whence(self):
437
447
        """Test the seek whence parameter values."""
438
 
        f = response.RangeFile('foo', StringIO('abc'))
 
448
        f = response.RangeFile('foo', BytesIO(b'abc'))
439
449
        f.set_range(0, 3)
440
450
        f.seek(0)
441
451
        f.seek(1, 1)
445
455
    def test_range_syntax(self):
446
456
        """Test the Content-Range scanning."""
447
457
 
448
 
        f = response.RangeFile('foo', StringIO())
 
458
        f = response.RangeFile('foo', BytesIO())
449
459
 
450
460
        def ok(expected, header_value):
451
461
            f.set_range_from_header(header_value)
452
462
            # Slightly peek under the covers to get the size
453
 
            self.assertEquals(expected, (f.tell(), f._size))
 
463
            self.assertEqual(expected, (f.tell(), f._size))
454
464
 
455
465
        ok((1, 10), 'bytes 1-10/11')
456
466
        ok((1, 10), 'bytes 1-10/*')
701
711
class TestHandleResponse(tests.TestCase):
702
712
 
703
713
    def _build_HTTPMessage(self, raw_headers):
704
 
        status_and_headers = StringIO(raw_headers)
 
714
        status_and_headers = BytesIO(raw_headers)
705
715
        # Get rid of the status line
706
716
        status_and_headers.readline()
707
717
        msg = httplib.HTTPMessage(status_and_headers)
712
722
        code, raw_headers, body = a_response
713
723
        msg = self._build_HTTPMessage(raw_headers)
714
724
        return response.handle_response('http://foo', code, msg,
715
 
                                        StringIO(a_response[2]))
 
725
                                        BytesIO(a_response[2]))
716
726
 
717
727
    def test_full_text(self):
718
728
        out = self.get_response(_full_text_response)
719
 
        # It is a StringIO from the original data
 
729
        # It is a BytesIO from the original data
720
730
        self.assertEqual(_full_text_response[2], out.read())
721
731
 
722
732
    def test_single_range(self):
764
774
        # We should not require Content-Type for a full response
765
775
        code, raw_headers, body = _full_text_response_no_content_type
766
776
        msg = self._build_HTTPMessage(raw_headers)
767
 
        out = response.handle_response('http://foo', code, msg, StringIO(body))
 
777
        out = response.handle_response('http://foo', code, msg, BytesIO(body))
768
778
        self.assertEqual(body, out.read())
769
779
 
770
780
    def test_full_text_no_content_length(self):
771
781
        code, raw_headers, body = _full_text_response_no_content_length
772
782
        msg = self._build_HTTPMessage(raw_headers)
773
 
        out = response.handle_response('http://foo', code, msg, StringIO(body))
 
783
        out = response.handle_response('http://foo', code, msg, BytesIO(body))
774
784
        self.assertEqual(body, out.read())
775
785
 
776
786
    def test_missing_content_range(self):
778
788
        msg = self._build_HTTPMessage(raw_headers)
779
789
        self.assertRaises(errors.InvalidHttpResponse,
780
790
                          response.handle_response,
781
 
                          'http://bogus', code, msg, StringIO(body))
 
791
                          'http://bogus', code, msg, BytesIO(body))
782
792
 
783
793
    def test_multipart_no_content_range(self):
784
794
        code, raw_headers, body = _multipart_no_content_range
785
795
        msg = self._build_HTTPMessage(raw_headers)
786
796
        self.assertRaises(errors.InvalidHttpResponse,
787
797
                          response.handle_response,
788
 
                          'http://bogus', code, msg, StringIO(body))
 
798
                          'http://bogus', code, msg, BytesIO(body))
789
799
 
790
800
    def test_multipart_no_boundary(self):
791
801
        out = self.get_response(_multipart_no_boundary)
800
810
    """
801
811
 
802
812
    def setUp(self):
803
 
        tests.TestCase.setUp(self)
 
813
        super(TestRangeFileSizeReadLimited, self).setUp()
804
814
        # create a test datablock larger than _max_read_size.
805
815
        chunk_size = response.RangeFile._max_read_size
806
816
        test_pattern = '0123456789ABCDEF'