1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
1
# Copyright (C) 2006-2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
37
37
InvalidHttpResponse.
41
import http.client as http_client
42
except ImportError: # python < 3
43
import httplib as http_client
40
from cStringIO import StringIO
49
from ..sixish import (
52
from ..transport.http import (
47
from bzrlib.transport.http import (
56
from .file_utils import (
51
from bzrlib.tests.file_utils import (
62
57
"""A socket-like object that can be given a predefined content."""
64
59
def __init__(self, data):
65
self.readfile = BytesIO(data)
60
self.readfile = StringIO(data)
67
62
def makefile(self, mode='r', bufsize=None):
68
63
return self.readfile
83
78
class TestResponseFileIter(tests.TestCase):
85
80
def test_iter_empty(self):
86
f = response.ResponseFile('empty', BytesIO())
81
f = response.ResponseFile('empty', StringIO())
87
82
self.assertEqual([], list(f))
89
84
def test_iter_many(self):
90
f = response.ResponseFile('many', BytesIO(b'0\n1\nboo!\n'))
85
f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
91
86
self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
108
103
# Now, get the response
109
104
resp = conn.getresponse()
110
105
# Read part of the response
111
self.assertEqual('0123456789\n', resp.read(11))
106
self.assertEquals('0123456789\n', resp.read(11))
112
107
# Override the thresold to force the warning emission
113
108
conn._range_warning_thresold = 6 # There are 7 bytes pending
114
109
conn.cleanup_pipe()
127
122
def test_can_read_at_first_access(self):
128
123
"""Test that the just created file can be read."""
129
self.assertEqual(self.alpha, self._file.read())
124
self.assertEquals(self.alpha, self._file.read())
131
126
def test_seek_read(self):
132
127
"""Test seek/read inside the range."""
134
129
start = self.first_range_start
135
130
# Before any use, tell() should be at the range start
136
self.assertEqual(start, f.tell())
131
self.assertEquals(start, f.tell())
137
132
cur = start # For an overall offset assertion
138
133
f.seek(start + 3)
140
self.assertEqual('def', f.read(3))
135
self.assertEquals('def', f.read(3))
141
136
cur += len('def')
144
self.assertEqual('klmn', f.read(4))
139
self.assertEquals('klmn', f.read(4))
145
140
cur += len('klmn')
146
141
# read(0) in the middle of a range
147
self.assertEqual('', f.read(0))
142
self.assertEquals('', f.read(0))
151
self.assertEqual(here, f.tell())
152
self.assertEqual(cur, f.tell())
146
self.assertEquals(here, f.tell())
147
self.assertEquals(cur, f.tell())
154
149
def test_read_zero(self):
156
self.assertEqual('', f.read(0))
151
self.assertEquals('', f.read(0))
158
self.assertEqual('', f.read(0))
153
self.assertEquals('', f.read(0))
160
155
def test_seek_at_range_end(self):
164
159
def test_read_at_range_end(self):
165
160
"""Test read behaviour at range end."""
167
self.assertEqual(self.alpha, f.read())
168
self.assertEqual('', f.read(0))
162
self.assertEquals(self.alpha, f.read())
163
self.assertEquals('', f.read(0))
169
164
self.assertRaises(errors.InvalidRange, f.read, 1)
171
166
def test_unbounded_read_after_seek(self):
174
169
# Should not cross ranges
175
self.assertEqual('yz', f.read())
170
self.assertEquals('yz', f.read())
177
172
def test_seek_backwards(self):
203
198
The semantic is unclear in case of multiple ranges. Seeking from end
204
199
exists only for the http transports, cannot be used if the file size is
205
unknown and is not used in breezy itself. This test must be (and is)
200
unknown and is not used in bzrlib itself. This test must be (and is)
206
201
overridden by daughter classes.
208
203
Reading from end makes sense only when a range has been requested from
215
self.assertEqual('yz', f.read())
210
self.assertEquals('yz', f.read())
218
213
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
222
217
super(TestRangeFileSizeUnknown, self).setUp()
223
218
self._file = response.RangeFile('Whole_file_size_known',
219
StringIO(self.alpha))
225
220
# We define no range, relying on RangeFile to provide default values
226
221
self.first_range_start = 0 # It's the whole file
235
230
def test_read_at_range_end(self):
236
231
"""Test read behaviour at range end."""
238
self.assertEqual(self.alpha, f.read())
239
self.assertEqual('', f.read(0))
240
self.assertEqual('', f.read(1))
233
self.assertEquals(self.alpha, f.read())
234
self.assertEquals('', f.read(0))
235
self.assertEquals('', f.read(1))
243
238
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
247
242
super(TestRangeFileSizeKnown, self).setUp()
248
243
self._file = response.RangeFile('Whole_file_size_known',
244
StringIO(self.alpha))
250
245
self._file.set_range(0, len(self.alpha))
251
246
self.first_range_start = 0 # It's the whole file
258
253
super(TestRangeFileSingleRange, self).setUp()
259
254
self._file = response.RangeFile('Single_range_file',
255
StringIO(self.alpha))
261
256
self.first_range_start = 15
262
257
self._file.set_range(self.first_range_start, len(self.alpha))
353
348
def test_read_all_ranges(self):
355
self.assertEqual(self.alpha, f.read()) # Read first range
350
self.assertEquals(self.alpha, f.read()) # Read first range
356
351
f.seek(100) # Trigger the second range recognition
357
self.assertEqual(self.alpha, f.read()) # Read second range
358
self.assertEqual(126, f.tell())
352
self.assertEquals(self.alpha, f.read()) # Read second range
353
self.assertEquals(126, f.tell())
359
354
f.seek(126) # Start of third range which is also the current pos !
360
self.assertEqual('A', f.read(1))
355
self.assertEquals('A', f.read(1))
362
self.assertEqual('LMN', f.read(3))
357
self.assertEquals('LMN', f.read(3))
364
359
def test_seek_from_end(self):
365
360
"""See TestRangeFileMixin.test_seek_from_end."""
372
self.assertEqual('yz', f.read())
367
self.assertEquals('yz', f.read())
373
368
self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
375
370
def test_seek_into_void(self):
387
382
def test_seek_across_ranges(self):
389
384
f.seek(126) # skip the two first ranges
390
self.assertEqual('AB', f.read(2))
385
self.assertEquals('AB', f.read(2))
392
387
def test_checked_read_dont_overflow_buffers(self):
394
389
# We force a very low value to exercise all code paths in _checked_read
395
390
f._discarded_buf_size = 8
396
391
f.seek(126) # skip the two first ranges
397
self.assertEqual('AB', f.read(2))
392
self.assertEquals('AB', f.read(2))
399
394
def test_seek_twice_between_ranges(self):
413
408
def test_read_at_range_end(self):
415
self.assertEqual(self.alpha, f.read())
416
self.assertEqual(self.alpha, f.read())
417
self.assertEqual(self.alpha.upper(), f.read())
410
self.assertEquals(self.alpha, f.read())
411
self.assertEquals(self.alpha, f.read())
412
self.assertEquals(self.alpha.upper(), f.read())
418
413
self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
449
444
def test_seek_whence(self):
450
445
"""Test the seek whence parameter values."""
451
f = response.RangeFile('foo', BytesIO(b'abc'))
446
f = response.RangeFile('foo', StringIO('abc'))
452
447
f.set_range(0, 3)
458
453
def test_range_syntax(self):
459
454
"""Test the Content-Range scanning."""
461
f = response.RangeFile('foo', BytesIO())
456
f = response.RangeFile('foo', StringIO())
463
458
def ok(expected, header_value):
464
459
f.set_range_from_header(header_value)
465
460
# Slightly peek under the covers to get the size
466
self.assertEqual(expected, (f.tell(), f._size))
461
self.assertEquals(expected, (f.tell(), f._size))
468
463
ok((1, 10), 'bytes 1-10/11')
469
464
ok((1, 10), 'bytes 1-10/*')
714
709
class TestHandleResponse(tests.TestCase):
716
711
def _build_HTTPMessage(self, raw_headers):
717
status_and_headers = BytesIO(raw_headers)
712
status_and_headers = StringIO(raw_headers)
718
713
# Get rid of the status line
719
714
status_and_headers.readline()
720
msg = http_client.HTTPMessage(status_and_headers)
715
msg = httplib.HTTPMessage(status_and_headers)
723
718
def get_response(self, a_response):
725
720
code, raw_headers, body = a_response
726
721
msg = self._build_HTTPMessage(raw_headers)
727
722
return response.handle_response('http://foo', code, msg,
728
BytesIO(a_response[2]))
723
StringIO(a_response[2]))
730
725
def test_full_text(self):
731
726
out = self.get_response(_full_text_response)
732
# It is a BytesIO from the original data
727
# It is a StringIO from the original data
733
728
self.assertEqual(_full_text_response[2], out.read())
735
730
def test_single_range(self):
777
772
# We should not require Content-Type for a full response
778
773
code, raw_headers, body = _full_text_response_no_content_type
779
774
msg = self._build_HTTPMessage(raw_headers)
780
out = response.handle_response('http://foo', code, msg, BytesIO(body))
775
out = response.handle_response('http://foo', code, msg, StringIO(body))
781
776
self.assertEqual(body, out.read())
783
778
def test_full_text_no_content_length(self):
784
779
code, raw_headers, body = _full_text_response_no_content_length
785
780
msg = self._build_HTTPMessage(raw_headers)
786
out = response.handle_response('http://foo', code, msg, BytesIO(body))
781
out = response.handle_response('http://foo', code, msg, StringIO(body))
787
782
self.assertEqual(body, out.read())
789
784
def test_missing_content_range(self):
791
786
msg = self._build_HTTPMessage(raw_headers)
792
787
self.assertRaises(errors.InvalidHttpResponse,
793
788
response.handle_response,
794
'http://bogus', code, msg, BytesIO(body))
789
'http://bogus', code, msg, StringIO(body))
796
791
def test_multipart_no_content_range(self):
797
792
code, raw_headers, body = _multipart_no_content_range
798
793
msg = self._build_HTTPMessage(raw_headers)
799
794
self.assertRaises(errors.InvalidHttpResponse,
800
795
response.handle_response,
801
'http://bogus', code, msg, BytesIO(body))
796
'http://bogus', code, msg, StringIO(body))
803
798
def test_multipart_no_boundary(self):
804
799
out = self.get_response(_multipart_no_boundary)