1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
# <aaron.bentley@utoronto.ca>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from __future__ import absolute_import
20
class IterableFileBase(object):
21
"""Create a file-like object from any iterable"""
23
def __init__(self, iterable):
25
self._iter = iterable.__iter__()
29
def read_n(self, length):
31
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
34
def test_length(result):
35
if len(result) >= length:
39
return self._read(test_length)
41
def read_to(self, sequence, length=None):
43
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
49
def test_contents(result):
50
if length is not None:
51
if len(result) >= length:
54
return result.index(sequence)+len(sequence)
57
return self._read(test_contents)
59
def _read(self, result_length):
61
Read data until result satisfies the condition result_length.
62
result_length is a callable that returns None until the condition
63
is satisfied, and returns the length of the result to use when
64
the condition is satisfied. (i.e. it returns the length of the
65
subset of the first condition match.)
68
while result_length(result) is None:
70
result += next(self._iter)
75
output_length = result_length(result)
76
self._buffer = result[output_length:]
77
return result[:output_length]
81
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
86
return self._read(no_stop)
89
def push_back(self, contents):
91
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
96
'Shis is \\na te\\nst.'
98
self._buffer = contents + self._buffer
101
class IterableFile(object):
102
"""This class supplies all File methods that can be implemented cheaply."""
103
def __init__(self, iterable):
104
object.__init__(self)
105
self._file_base = IterableFileBase(iterable)
106
self._iter = self._make_iterator()
110
def _make_iterator(self):
111
while not self._file_base.done:
113
result = self._file_base.read_to(b'\n')
117
def _check_closed(self):
119
raise ValueError("File is closed.")
123
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
130
self._file_base.done = True
133
closed = property(lambda x: x._closed)
138
def __exit__(self, exc_type, exc_val, exc_tb):
139
# If there was an error raised, prefer the original one
148
"""No-op for standard compliance.
149
>>> f = IterableFile([])
152
Traceback (most recent call last):
153
ValueError: File is closed.
158
"""Implementation of the iterator protocol's next()
160
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
165
Traceback (most recent call last):
166
ValueError: File is closed.
167
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
173
Traceback (most recent call last):
177
return next(self._iter)
183
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
184
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
185
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
188
Traceback (most recent call last):
189
ValueError: File is closed.
193
def read(self, length=None):
195
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
197
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
200
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
203
Traceback (most recent call last):
204
ValueError: File is closed.
208
return self._file_base.read_all()
210
return self._file_base.read_n(length)
212
def read_to(self, sequence, size=None):
214
Read characters until a sequence is found, with optional max size.
215
The specified sequence, if found, will be included in the result
217
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
224
Traceback (most recent call last):
225
ValueError: File is closed.
228
return self._file_base.read_to(sequence, size)
230
def readline(self, size=None):
232
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
239
Traceback (most recent call last):
240
ValueError: File is closed.
242
return self.read_to(b'\n', size)
244
def readlines(self, sizehint=None):
246
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
248
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
249
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
252
Traceback (most recent call last):
253
ValueError: File is closed.
257
line = self.readline()
262
elif len(line) < sizehint:
264
sizehint -= len(line)
266
self._file_base.push_back(line)
270
if __name__ == "__main__":