22
22
def __init__(self, iterable):
23
23
object.__init__(self)
24
24
self._iter = iterable.__iter__()
28
28
def read_n(self, length):
30
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
30
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
33
33
def test_length(result):
34
34
if len(result) >= length:
40
40
def read_to(self, sequence, length=None):
42
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
42
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
48
48
def test_contents(result):
49
49
if length is not None:
50
50
if len(result) >= length:
53
return result.index(sequence)+len(sequence)
53
return result.index(sequence) + len(sequence)
56
56
return self._read(test_contents)
66
66
result = self._buffer
67
67
while result_length(result) is None:
69
result += self._iter.next()
69
result += next(self._iter)
70
70
except StopIteration:
74
74
output_length = result_length(result)
75
75
self._buffer = result[output_length:]
78
78
def read_all(self):
80
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
80
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
83
83
def no_stop(result):
85
85
return self._read(no_stop)
88
87
def push_back(self, contents):
90
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
89
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
92
>>> f.push_back(b"Sh")
95
'Shis is \\na te\\nst.'
94
b'Shis is \\na te\\nst.'
97
96
self._buffer = contents + self._buffer
100
99
class IterableFile(object):
101
100
"""This class supplies all File methods that can be implemented cheaply."""
102
102
def __init__(self, iterable):
103
103
object.__init__(self)
104
104
self._file_base = IterableFileBase(iterable)
122
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
122
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
132
132
closed = property(lambda x: x._closed)
137
def __exit__(self, exc_type, exc_val, exc_tb):
138
# If there was an error raised, prefer the original one
141
except BaseException:
135
147
"""No-op for standard compliance.
136
148
>>> f = IterableFile([])
142
154
self._check_closed()
145
157
"""Implementation of the iterator protocol's next()
147
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
159
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
152
164
Traceback (most recent call last):
153
165
ValueError: File is closed.
154
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
166
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
160
172
Traceback (most recent call last):
163
175
self._check_closed()
164
return self._iter.next()
176
return next(self._iter)
166
180
def __iter__(self):
168
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
169
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
170
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
182
>>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
173
187
Traceback (most recent call last):
178
192
def read(self, length=None):
180
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
182
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
194
>>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
196
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
185
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
199
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
188
202
Traceback (most recent call last):
199
213
Read characters until a sequence is found, with optional max size.
200
214
The specified sequence, if found, will be included in the result
202
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
216
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
209
223
Traceback (most recent call last):
210
224
ValueError: File is closed.
215
229
def readline(self, size=None):
217
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
231
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
220
234
>>> f.readline(4)
224
238
Traceback (most recent call last):
225
239
ValueError: File is closed.
227
return self.read_to('\n', size)
241
return self.read_to(b'\n', size)
229
243
def readlines(self, sizehint=None):
231
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
245
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
232
246
>>> f.readlines()
233
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
234
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
247
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
236
250
>>> f.readlines()
237
251
Traceback (most recent call last):