1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
 
 
2
# <aaron.bentley@utoronto.ca>
 
 
4
# This program is free software; you can redistribute it and/or modify
 
 
5
# it under the terms of the GNU General Public License as published by
 
 
6
# the Free Software Foundation; either version 2 of the License, or
 
 
7
# (at your option) any later version.
 
 
9
# This program is distributed in the hope that it will be useful,
 
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
12
# GNU General Public License for more details.
 
 
14
# You should have received a copy of the GNU General Public License
 
 
15
# along with this program; if not, write to the Free Software
 
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
19
class IterableFileBase(object):
 
 
20
    """Create a file-like object from any iterable"""
 
 
22
    def __init__(self, iterable):
 
 
24
        self._iter = iterable.__iter__()
 
 
28
    def read_n(self, length):
 
 
30
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
 
33
        def test_length(result):
 
 
34
            if len(result) >= length:
 
 
38
        return self._read(test_length)
 
 
40
    def read_to(self, sequence, length=None):
 
 
42
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
48
        def test_contents(result):
 
 
49
            if length is not None:
 
 
50
                if len(result) >= length:
 
 
53
                return result.index(sequence)+len(sequence)
 
 
56
        return self._read(test_contents)
 
 
58
    def _read(self, result_length):
 
 
60
        Read data until result satisfies the condition result_length.
 
 
61
        result_length is a callable that returns None until the condition
 
 
62
        is satisfied, and returns the length of the result to use when
 
 
63
        the condition is satisfied.  (i.e. it returns the length of the
 
 
64
        subset of the first condition match.)
 
 
67
        while result_length(result) is None:
 
 
69
                result += self._iter.next()
 
 
74
        output_length = result_length(result)
 
 
75
        self._buffer = result[output_length:]
 
 
76
        return result[:output_length]
 
 
80
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
 
85
        return self._read(no_stop)
 
 
88
    def push_back(self, contents):
 
 
90
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
95
        'Shis is \\na te\\nst.'
 
 
97
        self._buffer = contents + self._buffer
 
 
100
class IterableFile(object):
 
 
101
    """This class supplies all File methods that can be implemented cheaply."""
 
 
102
    def __init__(self, iterable):
 
 
103
        object.__init__(self)
 
 
104
        self._file_base = IterableFileBase(iterable)
 
 
105
        self._iter = self._make_iterator()
 
 
109
    def _make_iterator(self):
 
 
110
        while not self._file_base.done:
 
 
112
            result = self._file_base.read_to('\n')
 
 
116
    def _check_closed(self):
 
 
118
            raise ValueError("File is closed.")
 
 
122
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
129
        self._file_base.done = True
 
 
132
    closed = property(lambda x: x._closed)
 
 
135
        """No-op for standard compliance.
 
 
136
        >>> f = IterableFile([])
 
 
139
        Traceback (most recent call last):
 
 
140
        ValueError: File is closed.
 
 
145
        """Implementation of the iterator protocol's next()
 
 
147
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
 
152
        Traceback (most recent call last):
 
 
153
        ValueError: File is closed.
 
 
154
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
 
160
        Traceback (most recent call last):
 
 
164
        return self._iter.next()
 
 
168
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
 
169
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
 
170
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
173
        Traceback (most recent call last):
 
 
174
        ValueError: File is closed.
 
 
178
    def read(self, length=None):
 
 
180
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
 
182
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
185
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
 
188
        Traceback (most recent call last):
 
 
189
        ValueError: File is closed.
 
 
193
            return self._file_base.read_all()
 
 
195
            return self._file_base.read_n(length)
 
 
197
    def read_to(self, sequence, size=None):
 
 
199
        Read characters until a sequence is found, with optional max size.
 
 
200
        The specified sequence, if found, will be included in the result
 
 
202
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
209
        Traceback (most recent call last):
 
 
210
        ValueError: File is closed.
 
 
213
        return self._file_base.read_to(sequence, size)
 
 
215
    def readline(self, size=None):
 
 
217
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
224
        Traceback (most recent call last):
 
 
225
        ValueError: File is closed.
 
 
227
        return self.read_to('\n', size)
 
 
229
    def readlines(self, sizehint=None):
 
 
231
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
233
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
 
234
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
 
237
        Traceback (most recent call last):
 
 
238
        ValueError: File is closed.
 
 
242
            line = self.readline()
 
 
247
            elif len(line) < sizehint:
 
 
249
                sizehint -= len(line)
 
 
251
                self._file_base.push_back(line)
 
 
255
if __name__ == "__main__":