1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
# <aaron.bentley@utoronto.ca>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from __future__ import absolute_import
20
class IterableFileBase(object):
21
"""Create a file-like object from any iterable"""
23
def __init__(self, iterable):
25
self._iter = iterable.__iter__()
29
def read_n(self, length):
31
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
34
def test_length(result):
35
if len(result) >= length:
39
return self._read(test_length)
41
def read_to(self, sequence, length=None):
43
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
49
def test_contents(result):
50
if length is not None:
51
if len(result) >= length:
54
return result.index(sequence)+len(sequence)
57
return self._read(test_contents)
59
def _read(self, result_length):
61
Read data until result satisfies the condition result_length.
62
result_length is a callable that returns None until the condition
63
is satisfied, and returns the length of the result to use when
64
the condition is satisfied. (i.e. it returns the length of the
65
subset of the first condition match.)
68
while result_length(result) is None:
70
result += next(self._iter)
75
output_length = result_length(result)
76
self._buffer = result[output_length:]
77
return result[:output_length]
81
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
86
return self._read(no_stop)
89
def push_back(self, contents):
91
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
96
'Shis is \\na te\\nst.'
98
self._buffer = contents + self._buffer
101
class IterableFile(object):
102
"""This class supplies all File methods that can be implemented cheaply."""
103
def __init__(self, iterable):
104
object.__init__(self)
105
self._file_base = IterableFileBase(iterable)
106
self._iter = self._make_iterator()
110
def _make_iterator(self):
111
while not self._file_base.done:
113
result = self._file_base.read_to(b'\n')
117
def _check_closed(self):
119
raise ValueError("File is closed.")
123
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
130
self._file_base.done = True
133
closed = property(lambda x: x._closed)
136
"""No-op for standard compliance.
137
>>> f = IterableFile([])
140
Traceback (most recent call last):
141
ValueError: File is closed.
146
"""Implementation of the iterator protocol's next()
148
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
153
Traceback (most recent call last):
154
ValueError: File is closed.
155
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
161
Traceback (most recent call last):
165
return next(self._iter)
171
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
172
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
173
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
176
Traceback (most recent call last):
177
ValueError: File is closed.
181
def read(self, length=None):
183
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
185
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
188
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
191
Traceback (most recent call last):
192
ValueError: File is closed.
196
return self._file_base.read_all()
198
return self._file_base.read_n(length)
200
def read_to(self, sequence, size=None):
202
Read characters until a sequence is found, with optional max size.
203
The specified sequence, if found, will be included in the result
205
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
212
Traceback (most recent call last):
213
ValueError: File is closed.
216
return self._file_base.read_to(sequence, size)
218
def readline(self, size=None):
220
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
227
Traceback (most recent call last):
228
ValueError: File is closed.
230
return self.read_to(b'\n', size)
232
def readlines(self, sizehint=None):
234
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
236
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
237
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
240
Traceback (most recent call last):
241
ValueError: File is closed.
245
line = self.readline()
250
elif len(line) < sizehint:
252
sizehint -= len(line)
254
self._file_base.push_back(line)
258
if __name__ == "__main__":