1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
# <aaron.bentley@utoronto.ca>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
class IterableFileBase(object):
20
"""Create a file-like object from any iterable"""
22
def __init__(self, iterable):
24
self._iter = iterable.__iter__()
28
def read_n(self, length):
30
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
33
def test_length(result):
34
if len(result) >= length:
38
return self._read(test_length)
40
def read_to(self, sequence, length=None):
42
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
48
def test_contents(result):
49
if length is not None:
50
if len(result) >= length:
53
return result.index(sequence) + len(sequence)
56
return self._read(test_contents)
58
def _read(self, result_length):
60
Read data until result satisfies the condition result_length.
61
result_length is a callable that returns None until the condition
62
is satisfied, and returns the length of the result to use when
63
the condition is satisfied. (i.e. it returns the length of the
64
subset of the first condition match.)
67
while result_length(result) is None:
69
result += next(self._iter)
74
output_length = result_length(result)
75
self._buffer = result[output_length:]
76
return result[:output_length]
80
>>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
85
return self._read(no_stop)
87
def push_back(self, contents):
89
>>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
92
>>> f.push_back(b"Sh")
94
b'Shis is \\na te\\nst.'
96
self._buffer = contents + self._buffer
99
class IterableFile(object):
100
"""This class supplies all File methods that can be implemented cheaply."""
102
def __init__(self, iterable):
103
object.__init__(self)
104
self._file_base = IterableFileBase(iterable)
105
self._iter = self._make_iterator()
109
def _make_iterator(self):
110
while not self._file_base.done:
112
result = self._file_base.read_to(b'\n')
116
def _check_closed(self):
118
raise ValueError("File is closed.")
122
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
129
self._file_base.done = True
132
closed = property(lambda x: x._closed)
137
def __exit__(self, exc_type, exc_val, exc_tb):
138
# If there was an error raised, prefer the original one
141
except BaseException:
147
"""No-op for standard compliance.
148
>>> f = IterableFile([])
151
Traceback (most recent call last):
152
ValueError: File is closed.
157
"""Implementation of the iterator protocol's next()
159
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
164
Traceback (most recent call last):
165
ValueError: File is closed.
166
>>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
172
Traceback (most recent call last):
176
return next(self._iter)
182
>>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
187
Traceback (most recent call last):
188
ValueError: File is closed.
192
def read(self, length=None):
194
>>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
196
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
199
>>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
202
Traceback (most recent call last):
203
ValueError: File is closed.
207
return self._file_base.read_all()
209
return self._file_base.read_n(length)
211
def read_to(self, sequence, size=None):
213
Read characters until a sequence is found, with optional max size.
214
The specified sequence, if found, will be included in the result
216
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
223
Traceback (most recent call last):
224
ValueError: File is closed.
227
return self._file_base.read_to(sequence, size)
229
def readline(self, size=None):
231
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
238
Traceback (most recent call last):
239
ValueError: File is closed.
241
return self.read_to(b'\n', size)
243
def readlines(self, sizehint=None):
245
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
247
[b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
>>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
251
Traceback (most recent call last):
252
ValueError: File is closed.
256
line = self.readline()
261
elif len(line) < sizehint:
263
sizehint -= len(line)
265
self._file_base.push_back(line)
269
if __name__ == "__main__":