1
# Copyright (C) 2005 Aaron Bentley
 
 
2
# <aaron.bentley@utoronto.ca>
 
 
4
#    This program is free software; you can redistribute it and/or modify
 
 
5
#    it under the terms of the GNU General Public License as published by
 
 
6
#    the Free Software Foundation; either version 2 of the License, or
 
 
7
#    (at your option) any later version.
 
 
9
#    This program is distributed in the hope that it will be useful,
 
 
10
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
11
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
12
#    GNU General Public License for more details.
 
 
14
#    You should have received a copy of the GNU General Public License
 
 
15
#    along with this program; if not, write to the Free Software
 
 
16
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
20
class IterableFileBase(object):
 
 
21
    """Create a file-like object from any iterable"""
 
 
23
    def __init__(self, iterable):
 
 
25
        self._iter = iterable.__iter__()
 
 
29
    def read_n(self, length):
 
 
31
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
 
34
        def test_length(result):
 
 
35
            if len(result) >= length:
 
 
39
        return self._read(test_length)
 
 
41
    def read_to(self, sequence, length=None):
 
 
43
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
49
        def test_contents(result):
 
 
50
            if length is not None:
 
 
51
                if len(result) >= length:
 
 
54
                return result.index(sequence)+len(sequence)
 
 
57
        return self._read(test_contents)
 
 
59
    def _read(self, result_length):
 
 
61
        Read data until result satisfies the condition result_length.
 
 
62
        result_length is a callable that returns None until the condition
 
 
63
        is satisfied, and returns the length of the result to use when
 
 
64
        the condition is satisfied.  (i.e. it returns the length of the
 
 
65
        subset of the first condition match.)
 
 
68
        while result_length(result) is None:
 
 
70
                result += self._iter.next()
 
 
75
        output_length = result_length(result)
 
 
76
        self._buffer = result[output_length:]
 
 
77
        return result[:output_length]
 
 
81
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
 
86
        return self._read(no_stop)
 
 
89
    def push_back(self, contents):
 
 
91
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
96
        'Shis is \\na te\\nst.'
 
 
98
        self._buffer = contents + self._buffer
 
 
101
class IterableFile(object):
 
 
102
    """This class supplies all File methods that can be implemented cheaply."""
 
 
103
    def __init__(self, iterable):
 
 
104
        object.__init__(self)
 
 
105
        self._file_base = IterableFileBase(iterable)
 
 
106
        self._iter = self._make_iterator()
 
 
110
    def _make_iterator(self):
 
 
111
        while not self._file_base.done:
 
 
113
            result = self._file_base.read_to('\n')
 
 
117
    def _check_closed(self):
 
 
119
            raise ValueError("File is closed.")
 
 
123
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
130
        self._file_base.done = True
 
 
133
    closed = property(lambda x: x._closed)
 
 
136
        """No-op for standard compliance.
 
 
137
        >>> f = IterableFile([])
 
 
140
        Traceback (most recent call last):
 
 
141
        ValueError: File is closed.
 
 
146
        """Implementation of the iterator protocol's next()
 
 
148
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
 
153
        Traceback (most recent call last):
 
 
154
        ValueError: File is closed.
 
 
155
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
 
161
        Traceback (most recent call last):
 
 
165
        return self._iter.next()
 
 
169
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
 
170
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
 
171
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
174
        Traceback (most recent call last):
 
 
175
        ValueError: File is closed.
 
 
179
    def read(self, length=None):
 
 
181
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
 
183
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
186
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
189
        Traceback (most recent call last):
 
 
190
        ValueError: File is closed.
 
 
194
            return self._file_base.read_all()
 
 
196
            return self._file_base.read_n(length)
 
 
198
    def read_to(self, sequence, size=None):
 
 
200
        Read characters until a sequence is found, with optional max size.
 
 
201
        The specified sequence, if found, will be included in the result
 
 
203
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
210
        Traceback (most recent call last):
 
 
211
        ValueError: File is closed.
 
 
214
        return self._file_base.read_to(sequence, size)
 
 
216
    def readline(self, size=None):
 
 
218
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
225
        Traceback (most recent call last):
 
 
226
        ValueError: File is closed.
 
 
228
        return self.read_to('\n', size)
 
 
230
    def readlines(self, sizehint=None):
 
 
232
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
234
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
 
235
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
238
        Traceback (most recent call last):
 
 
239
        ValueError: File is closed.
 
 
243
            line = self.readline()
 
 
248
            elif len(line) < sizehint:
 
 
250
                sizehint -= len(line)
 
 
252
                self._file_base.push_back(line)
 
 
256
if __name__ == "__main__":