14
14
# You should have received a copy of the GNU General Public License
15
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
19
class IterableFileBase(object):
22
22
def __init__(self, iterable):
23
23
object.__init__(self)
24
24
self._iter = iterable.__iter__()
28
28
def read_n(self, length):
30
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
30
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
33
33
def test_length(result):
34
34
if len(result) >= length:
40
40
def read_to(self, sequence, length=None):
42
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
42
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
48
48
def test_contents(result):
49
49
if length is not None:
50
50
if len(result) >= length:
53
return result.index(sequence) + len(sequence)
53
return result.index(sequence)+len(sequence)
56
56
return self._read(test_contents)
66
66
result = self._buffer
67
67
while result_length(result) is None:
69
result += next(self._iter)
69
result += self._iter.next()
70
70
except StopIteration:
74
74
output_length = result_length(result)
75
75
self._buffer = result[output_length:]
78
78
def read_all(self):
80
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
80
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
83
83
def no_stop(result):
85
85
return self._read(no_stop)
87
88
def push_back(self, contents):
89
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
92
>>> f.push_back(b"Sh")
90
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
94
b'Shis is \\na te\\nst.'
95
'Shis is \\na te\\nst.'
96
97
self._buffer = contents + self._buffer
99
100
class IterableFile(object):
100
101
"""This class supplies all File methods that can be implemented cheaply."""
102
102
def __init__(self, iterable):
103
103
object.__init__(self)
104
104
self._file_base = IterableFileBase(iterable)
122
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
122
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
132
132
closed = property(lambda x: x._closed)
137
def __exit__(self, exc_type, exc_val, exc_tb):
138
# If there was an error raised, prefer the original one
141
except BaseException:
147
135
"""No-op for standard compliance.
148
136
>>> f = IterableFile([])
154
142
self._check_closed()
157
145
"""Implementation of the iterator protocol's next()
159
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
147
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
164
152
Traceback (most recent call last):
165
153
ValueError: File is closed.
166
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
154
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
172
160
Traceback (most recent call last):
175
163
self._check_closed()
176
return next(self._iter)
164
return self._iter.next()
180
166
def __iter__(self):
182
>>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
168
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
169
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
170
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
187
173
Traceback (most recent call last):
192
178
def read(self, length=None):
194
>>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
196
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
180
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
182
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
199
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
185
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
202
188
Traceback (most recent call last):
213
199
Read characters until a sequence is found, with optional max size.
214
200
The specified sequence, if found, will be included in the result
216
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
202
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
223
209
Traceback (most recent call last):
224
210
ValueError: File is closed.
229
215
def readline(self, size=None):
231
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
217
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
234
220
>>> f.readline(4)
238
224
Traceback (most recent call last):
239
225
ValueError: File is closed.
241
return self.read_to(b'\n', size)
227
return self.read_to('\n', size)
243
229
def readlines(self, sizehint=None):
245
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
231
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
246
232
>>> f.readlines()
247
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
233
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
234
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
250
236
>>> f.readlines()
251
237
Traceback (most recent call last):