15
15
# along with this program; if not, write to the Free Software
16
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from __future__ import absolute_import
20
19
class IterableFileBase(object):
21
20
"""Create a file-like object from any iterable"""
23
22
def __init__(self, iterable):
24
23
object.__init__(self)
25
24
self._iter = iterable.__iter__()
29
28
def read_n(self, length):
31
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
30
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
34
33
def test_length(result):
35
34
if len(result) >= length:
41
40
def read_to(self, sequence, length=None):
43
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
42
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
49
48
def test_contents(result):
50
49
if length is not None:
51
50
if len(result) >= length:
54
return result.index(sequence)+len(sequence)
53
return result.index(sequence) + len(sequence)
57
56
return self._read(test_contents)
67
66
result = self._buffer
68
67
while result_length(result) is None:
70
result += self._iter.next()
69
result += next(self._iter)
71
70
except StopIteration:
75
74
output_length = result_length(result)
76
75
self._buffer = result[output_length:]
79
78
def read_all(self):
81
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
80
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
84
83
def no_stop(result):
86
85
return self._read(no_stop)
89
87
def push_back(self, contents):
91
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
89
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
92
>>> f.push_back(b"Sh")
96
'Shis is \\na te\\nst.'
94
b'Shis is \\na te\\nst.'
98
96
self._buffer = contents + self._buffer
101
99
class IterableFile(object):
102
100
"""This class supplies all File methods that can be implemented cheaply."""
103
102
def __init__(self, iterable):
104
103
object.__init__(self)
105
104
self._file_base = IterableFileBase(iterable)
123
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
122
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
133
132
closed = property(lambda x: x._closed)
137
def __exit__(self, exc_type, exc_val, exc_tb):
138
# If there was an error raised, prefer the original one
141
except BaseException:
136
147
"""No-op for standard compliance.
137
148
>>> f = IterableFile([])
143
154
self._check_closed()
146
157
"""Implementation of the iterator protocol's next()
148
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
159
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
153
164
Traceback (most recent call last):
154
165
ValueError: File is closed.
155
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
166
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
161
172
Traceback (most recent call last):
164
175
self._check_closed()
165
return self._iter.next()
176
return next(self._iter)
167
180
def __iter__(self):
169
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
170
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
171
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
182
>>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
174
187
Traceback (most recent call last):
179
192
def read(self, length=None):
181
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
183
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
194
>>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
196
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
186
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
199
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
189
202
Traceback (most recent call last):
200
213
Read characters until a sequence is found, with optional max size.
201
214
The specified sequence, if found, will be included in the result
203
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
216
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
210
223
Traceback (most recent call last):
211
224
ValueError: File is closed.
216
229
def readline(self, size=None):
218
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
231
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
221
234
>>> f.readline(4)
225
238
Traceback (most recent call last):
226
239
ValueError: File is closed.
228
return self.read_to('\n', size)
241
return self.read_to(b'\n', size)
230
243
def readlines(self, sizehint=None):
232
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
245
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
233
246
>>> f.readlines()
234
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
235
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
247
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
237
250
>>> f.readlines()
238
251
Traceback (most recent call last):