1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
# <aaron.bentley@utoronto.ca>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from __future__ import absolute_import
21
class IterableFileBase(object):
22
"""Create a file-like object from any iterable"""
24
def __init__(self, iterable):
26
self._iter = iterable.__iter__()
30
def read_n(self, length):
32
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
35
def test_length(result):
36
if len(result) >= length:
40
return self._read(test_length)
42
def read_to(self, sequence, length=None):
44
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
50
def test_contents(result):
51
if length is not None:
52
if len(result) >= length:
55
return result.index(sequence) + len(sequence)
58
return self._read(test_contents)
60
def _read(self, result_length):
62
Read data until result satisfies the condition result_length.
63
result_length is a callable that returns None until the condition
64
is satisfied, and returns the length of the result to use when
65
the condition is satisfied. (i.e. it returns the length of the
66
subset of the first condition match.)
69
while result_length(result) is None:
71
result += next(self._iter)
76
output_length = result_length(result)
77
self._buffer = result[output_length:]
78
return result[:output_length]
82
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
87
return self._read(no_stop)
89
def push_back(self, contents):
91
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
96
'Shis is \\na te\\nst.'
98
self._buffer = contents + self._buffer
101
class IterableFile(object):
102
"""This class supplies all File methods that can be implemented cheaply."""
104
def __init__(self, iterable):
105
object.__init__(self)
106
self._file_base = IterableFileBase(iterable)
107
self._iter = self._make_iterator()
111
def _make_iterator(self):
112
while not self._file_base.done:
114
result = self._file_base.read_to(b'\n')
118
def _check_closed(self):
120
raise ValueError("File is closed.")
124
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
131
self._file_base.done = True
134
closed = property(lambda x: x._closed)
139
def __exit__(self, exc_type, exc_val, exc_tb):
140
# If there was an error raised, prefer the original one
143
except BaseException:
149
"""No-op for standard compliance.
150
>>> f = IterableFile([])
153
Traceback (most recent call last):
154
ValueError: File is closed.
159
"""Implementation of the iterator protocol's next()
161
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
166
Traceback (most recent call last):
167
ValueError: File is closed.
168
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
174
Traceback (most recent call last):
178
return next(self._iter)
184
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
185
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
186
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
189
Traceback (most recent call last):
190
ValueError: File is closed.
194
def read(self, length=None):
196
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
198
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
201
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
204
Traceback (most recent call last):
205
ValueError: File is closed.
209
return self._file_base.read_all()
211
return self._file_base.read_n(length)
213
def read_to(self, sequence, size=None):
215
Read characters until a sequence is found, with optional max size.
216
The specified sequence, if found, will be included in the result
218
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
225
Traceback (most recent call last):
226
ValueError: File is closed.
229
return self._file_base.read_to(sequence, size)
231
def readline(self, size=None):
233
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
240
Traceback (most recent call last):
241
ValueError: File is closed.
243
return self.read_to(b'\n', size)
245
def readlines(self, sizehint=None):
247
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
249
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
250
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
253
Traceback (most recent call last):
254
ValueError: File is closed.
258
line = self.readline()
263
elif len(line) < sizehint:
265
sizehint -= len(line)
267
self._file_base.push_back(line)
271
if __name__ == "__main__":