15
15
# along with this program; if not, write to the Free Software
16
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from __future__ import absolute_import
19
20
class IterableFileBase(object):
20
21
"""Create a file-like object from any iterable"""
22
23
def __init__(self, iterable):
23
24
object.__init__(self)
24
25
self._iter = iterable.__iter__()
28
29
def read_n(self, length):
30
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
31
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
33
34
def test_length(result):
34
35
if len(result) >= length:
40
41
def read_to(self, sequence, length=None):
42
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
43
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
48
49
def test_contents(result):
49
50
if length is not None:
50
51
if len(result) >= length:
53
return result.index(sequence) + len(sequence)
54
return result.index(sequence)+len(sequence)
56
57
return self._read(test_contents)
66
67
result = self._buffer
67
68
while result_length(result) is None:
69
result += next(self._iter)
70
result += self._iter.next()
70
71
except StopIteration:
74
75
output_length = result_length(result)
75
76
self._buffer = result[output_length:]
78
79
def read_all(self):
80
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
81
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
83
84
def no_stop(result):
85
86
return self._read(no_stop)
87
89
def push_back(self, contents):
89
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
92
>>> f.push_back(b"Sh")
91
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
94
b'Shis is \\na te\\nst.'
96
'Shis is \\na te\\nst.'
96
98
self._buffer = contents + self._buffer
99
101
class IterableFile(object):
100
102
"""This class supplies all File methods that can be implemented cheaply."""
102
103
def __init__(self, iterable):
103
104
object.__init__(self)
104
105
self._file_base = IterableFileBase(iterable)
122
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
123
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
132
133
closed = property(lambda x: x._closed)
137
def __exit__(self, exc_type, exc_val, exc_tb):
138
# If there was an error raised, prefer the original one
141
except BaseException:
147
136
"""No-op for standard compliance.
148
137
>>> f = IterableFile([])
154
143
self._check_closed()
157
146
"""Implementation of the iterator protocol's next()
159
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
148
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
164
153
Traceback (most recent call last):
165
154
ValueError: File is closed.
166
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
155
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
172
161
Traceback (most recent call last):
175
164
self._check_closed()
176
return next(self._iter)
165
return self._iter.next()
180
167
def __iter__(self):
182
>>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
169
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
170
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
171
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
187
174
Traceback (most recent call last):
192
179
def read(self, length=None):
194
>>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
196
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
181
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
183
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
199
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
186
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
202
189
Traceback (most recent call last):
213
200
Read characters until a sequence is found, with optional max size.
214
201
The specified sequence, if found, will be included in the result
216
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
203
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
223
210
Traceback (most recent call last):
224
211
ValueError: File is closed.
229
216
def readline(self, size=None):
231
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
218
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
234
221
>>> f.readline(4)
238
225
Traceback (most recent call last):
239
226
ValueError: File is closed.
241
return self.read_to(b'\n', size)
228
return self.read_to('\n', size)
243
230
def readlines(self, sizehint=None):
245
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
232
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
246
233
>>> f.readlines()
247
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
234
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
235
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
250
237
>>> f.readlines()
251
238
Traceback (most recent call last):