1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
3
3
# This program is free software; you can redistribute it and/or modify
 
4
4
# it under the terms of the GNU General Public License as published by
 
 
13
13
# You should have received a copy of the GNU General Public License
 
14
14
# along with this program; if not, write to the Free Software
 
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
17
"""Tests from HTTP response parsing.
 
 
62
59
    def makefile(self, mode='r', bufsize=None):
 
63
60
        return self.readfile
 
66
62
class FakeHTTPConnection(_urllib2_wrappers.HTTPConnection):
 
68
64
    def __init__(self, sock):
 
 
96
92
        # Override the thresold to force the warning emission
 
97
93
        conn._range_warning_thresold = 6 # There are 7 bytes pending
 
98
94
        conn.cleanup_pipe()
 
99
 
        self.assertContainsRe(self.get_log(), 'Got a 200 response when asking')
 
 
95
        self.assertContainsRe(self._get_log(keep_log_file=True),
 
 
96
                              'Got a 200 response when asking')
 
102
99
class TestRangeFileMixin(object):
 
 
224
221
        self.assertEquals('', f.read(0))
 
225
222
        self.assertEquals('', f.read(1))
 
228
224
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
 
229
225
    """Test a RangeFile for a whole file whose size is known."""
 
 
253
249
        f._pos = 0 # Force an invalid pos
 
254
250
        self.assertRaises(errors.InvalidRange, f.read, 2)
 
257
252
class TestRangeFileMultipleRanges(tests.TestCase, TestRangeFileMixin):
 
258
253
    """Test a RangeFile for multiple ranges.
 
 
267
262
    fact) in real uses but may lead to hard to track bugs.
 
270
 
    # The following is used to represent the boundary paramter defined
 
271
 
    # in HTTP response headers and the boundary lines that separate
 
274
 
    boundary = "separation"
 
277
266
        super(TestRangeFileMultipleRanges, self).setUp()
 
279
 
        boundary = self.boundary
 
 
268
        boundary = 'separation'
 
282
271
        self.first_range_start = 25
 
 
288
277
            content += self._multipart_byterange(part, start, boundary,
 
291
 
        content += self._boundary_line()
 
 
280
        content += self._boundary_line(boundary)
 
293
282
        self._file = response.RangeFile('Multiple_ranges_file',
 
294
283
                                        StringIO(content))
 
295
 
        self.set_file_boundary()
 
297
 
    def _boundary_line(self):
 
298
 
        """Helper to build the formatted boundary line."""
 
299
 
        return '--' + self.boundary + '\r\n'
 
301
 
    def set_file_boundary(self):
 
302
284
        # Ranges are set by decoding the range headers, the RangeFile user is
 
303
285
        # supposed to call the following before using seek or read since it
 
304
286
        # requires knowing the *response* headers (in that case the boundary
 
305
287
        # which is part of the Content-Type header).
 
306
 
        self._file.set_boundary(self.boundary)
 
 
288
        self._file.set_boundary(boundary)
 
 
290
    def _boundary_line(self, boundary):
 
 
291
        """Helper to build the formatted boundary line."""
 
 
292
        return '--' + boundary + '\r\n'
 
308
294
    def _multipart_byterange(self, data, offset, boundary, file_size='*'):
 
309
295
        """Encode a part of a file as a multipart/byterange MIME type.
 
 
321
307
        :return: a string containing the data encoded as it will appear in the
 
322
308
            HTTP response body.
 
324
 
        bline = self._boundary_line()
 
 
310
        bline = self._boundary_line(boundary)
 
325
311
        # Each range begins with a boundary line
 
327
313
        # A range is described by a set of headers, but only 'Content-Range' is
 
 
405
391
        self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
 
408
 
class TestRangeFileMultipleRangesQuotedBoundaries(TestRangeFileMultipleRanges):
 
409
 
    """Perform the same tests as TestRangeFileMultipleRanges, but uses
 
410
 
    an angle-bracket quoted boundary string like IIS 6.0 and 7.0
 
411
 
    (but not IIS 5, which breaks the RFC in a different way
 
412
 
    by using square brackets, not angle brackets)
 
414
 
    This reveals a bug caused by
 
416
 
    - The bad implementation of RFC 822 unquoting in Python (angles are not
 
417
 
      quotes), coupled with
 
419
 
    - The bad implementation of RFC 2046 in IIS (angles are not permitted chars
 
423
 
    # The boundary as it appears in boundary lines
 
424
 
    # IIS 6 and 7 use this value
 
425
 
    _boundary_trimmed = "q1w2e3r4t5y6u7i8o9p0zaxscdvfbgnhmjklkl"
 
426
 
    boundary = '<' + _boundary_trimmed + '>'
 
428
 
    def set_file_boundary(self):
 
429
 
        # Emulate broken rfc822.unquote() here by removing angles
 
430
 
        self._file.set_boundary(self._boundary_trimmed)
 
433
394
class TestRangeFileVarious(tests.TestCase):
 
434
395
    """Tests RangeFile aspects not covered elsewhere."""
 
 
792
753
        out.read()  # Read the whole range
 
793
754
        # Fail to find the boundary line
 
794
755
        self.assertRaises(errors.InvalidHttpResponse, out.seek, 1, 1)
 
797
 
class TestRangeFileSizeReadLimited(tests.TestCase):
 
798
 
    """Test RangeFile _max_read_size functionality which limits the size of
 
799
 
    read blocks to prevent MemoryError messages in socket.recv.
 
803
 
        tests.TestCase.setUp(self)
 
804
 
        # create a test datablock larger than _max_read_size.
 
805
 
        chunk_size = response.RangeFile._max_read_size
 
806
 
        test_pattern = '0123456789ABCDEF'
 
807
 
        self.test_data =  test_pattern * (3 * chunk_size / len(test_pattern))
 
808
 
        self.test_data_len = len(self.test_data)
 
810
 
    def test_max_read_size(self):
 
811
 
        """Read data in blocks and verify that the reads are not larger than
 
812
 
           the maximum read size.
 
814
 
        # retrieve data in large blocks from response.RangeFile object
 
815
 
        mock_read_file = FakeReadFile(self.test_data)
 
816
 
        range_file = response.RangeFile('test_max_read_size', mock_read_file)
 
817
 
        response_data = range_file.read(self.test_data_len)
 
819
 
        # verify read size was equal to the maximum read size
 
820
 
        self.assertTrue(mock_read_file.get_max_read_size() > 0)
 
821
 
        self.assertEqual(mock_read_file.get_max_read_size(),
 
822
 
                         response.RangeFile._max_read_size)
 
823
 
        self.assertEqual(mock_read_file.get_read_count(), 3)
 
825
 
        # report error if the data wasn't equal (we only report the size due
 
826
 
        # to the length of the data)
 
827
 
        if response_data != self.test_data:
 
828
 
            message = "Data not equal.  Expected %d bytes, received %d."
 
829
 
            self.fail(message % (len(response_data), self.test_data_len))