1
# Copyright (C) 2005 Aaron Bentley
 
 
2
# <aaron.bentley@utoronto.ca>
 
 
4
#    This program is free software; you can redistribute it and/or modify
 
 
5
#    it under the terms of the GNU General Public License as published by
 
 
6
#    the Free Software Foundation; either version 2 of the License, or
 
 
7
#    (at your option) any later version.
 
 
9
#    This program is distributed in the hope that it will be useful,
 
 
10
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
11
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
12
#    GNU General Public License for more details.
 
 
14
#    You should have received a copy of the GNU General Public License
 
 
15
#    along with this program; if not, write to the Free Software
 
 
16
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
18
class IterableFileBase(object):
 
 
19
    """Create a file-like object from any iterable"""
 
 
21
    def __init__(self, iterable):
 
 
23
        self._iter = iterable.__iter__()
 
 
27
    def read_n(self, length):
 
 
29
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
 
32
        def test_length(result):
 
 
33
            if len(result) >= length:
 
 
37
        return self._read(test_length)
 
 
39
    def read_to(self, sequence, length=None):
 
 
41
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
47
        def test_contents(result):
 
 
48
            if length is not None:
 
 
49
                if len(result) >= length:
 
 
52
                return result.index(sequence)+len(sequence)
 
 
55
        return self._read(test_contents)
 
 
57
    def _read(self, result_length):
 
 
59
        Read data until result satisfies the condition result_length.
 
 
60
        result_length is a callable that returns None until the condition
 
 
61
        is satisfied, and returns the length of the result to use when
 
 
62
        the condition is satisfied.  (i.e. it returns the length of the
 
 
63
        subset of the first condition match.)
 
 
66
        while result_length(result) is None:
 
 
68
                result += self._iter.next()
 
 
73
        output_length = result_length(result)
 
 
74
        self._buffer = result[output_length:]
 
 
75
        return result[:output_length]
 
 
79
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
 
84
        return self._read(no_stop)
 
 
87
    def push_back(self, contents):
 
 
89
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
94
        'Shis is \\na te\\nst.'
 
 
96
        self._buffer = contents + self._buffer
 
 
99
class IterableFile(object):
 
 
100
    """This class supplies all File methods that can be implemented cheaply."""
 
 
101
    def __init__(self, iterable):
 
 
102
        object.__init__(self)
 
 
103
        self._file_base = IterableFileBase(iterable)
 
 
104
        self._iter = self._make_iterator()
 
 
108
    def _make_iterator(self):
 
 
109
        while not self._file_base.done:
 
 
111
            result = self._file_base.read_to('\n')
 
 
115
    def _check_closed(self):
 
 
117
            raise ValueError("File is closed.")
 
 
121
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
128
        self._file_base.done = True
 
 
131
    closed = property(lambda x: x._closed)
 
 
134
        """No-op for standard compliance.
 
 
135
        >>> f = IterableFile([])
 
 
138
        Traceback (most recent call last):
 
 
139
        ValueError: File is closed.
 
 
144
        """Implementation of the iterator protocol's next()
 
 
146
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
 
151
        Traceback (most recent call last):
 
 
152
        ValueError: File is closed.
 
 
153
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
 
159
        Traceback (most recent call last):
 
 
163
        return self._iter.next()
 
 
167
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
 
168
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
 
169
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
172
        Traceback (most recent call last):
 
 
173
        ValueError: File is closed.
 
 
177
    def read(self, length=None):
 
 
179
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
 
181
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
184
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
187
        Traceback (most recent call last):
 
 
188
        ValueError: File is closed.
 
 
192
            return self._file_base.read_all()
 
 
194
            return self._file_base.read_n(length)
 
 
196
    def read_to(self, sequence, size=None):
 
 
198
        Read characters until a sequence is found, with optional max size.
 
 
199
        The specified sequence, if found, will be included in the result
 
 
201
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
208
        Traceback (most recent call last):
 
 
209
        ValueError: File is closed.
 
 
212
        return self._file_base.read_to(sequence, size)
 
 
214
    def readline(self, size=None):
 
 
216
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
223
        Traceback (most recent call last):
 
 
224
        ValueError: File is closed.
 
 
226
        return self.read_to('\n', size)
 
 
228
    def readlines(self, sizehint=None):
 
 
230
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
232
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
 
233
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
236
        Traceback (most recent call last):
 
 
237
        ValueError: File is closed.
 
 
241
            line = self.readline()
 
 
246
            elif len(line) < sizehint:
 
 
248
                sizehint -= len(line)
 
 
250
                self._file_base.push_back(line)
 
 
254
if __name__ == "__main__":