1
 
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
 
2
 
# <aaron.bentley@utoronto.ca>
 
4
 
# This program is free software; you can redistribute it and/or modify
 
5
 
# it under the terms of the GNU General Public License as published by
 
6
 
# the Free Software Foundation; either version 2 of the License, or
 
7
 
# (at your option) any later version.
 
9
 
# This program is distributed in the hope that it will be useful,
 
10
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
 
# GNU General Public License for more details.
 
14
 
# You should have received a copy of the GNU General Public License
 
15
 
# along with this program; if not, write to the Free Software
 
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
19
 
class IterableFileBase(object):
 
20
 
    """Create a file-like object from any iterable"""
 
22
 
    def __init__(self, iterable):
 
24
 
        self._iter = iterable.__iter__()
 
28
 
    def read_n(self, length):
 
30
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
33
 
        def test_length(result):
 
34
 
            if len(result) >= length:
 
38
 
        return self._read(test_length)
 
40
 
    def read_to(self, sequence, length=None):
 
42
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
48
 
        def test_contents(result):
 
49
 
            if length is not None:
 
50
 
                if len(result) >= length:
 
53
 
                return result.index(sequence)+len(sequence)
 
56
 
        return self._read(test_contents)
 
58
 
    def _read(self, result_length):
 
60
 
        Read data until result satisfies the condition result_length.
 
61
 
        result_length is a callable that returns None until the condition
 
62
 
        is satisfied, and returns the length of the result to use when
 
63
 
        the condition is satisfied.  (i.e. it returns the length of the
 
64
 
        subset of the first condition match.)
 
67
 
        while result_length(result) is None:
 
69
 
                result += self._iter.next()
 
74
 
        output_length = result_length(result)
 
75
 
        self._buffer = result[output_length:]
 
76
 
        return result[:output_length]
 
80
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
85
 
        return self._read(no_stop)
 
88
 
    def push_back(self, contents):
 
90
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
95
 
        'Shis is \\na te\\nst.'
 
97
 
        self._buffer = contents + self._buffer
 
100
 
class IterableFile(object):
 
101
 
    """This class supplies all File methods that can be implemented cheaply."""
 
102
 
    def __init__(self, iterable):
 
103
 
        object.__init__(self)
 
104
 
        self._file_base = IterableFileBase(iterable)
 
105
 
        self._iter = self._make_iterator()
 
109
 
    def _make_iterator(self):
 
110
 
        while not self._file_base.done:
 
112
 
            result = self._file_base.read_to('\n')
 
116
 
    def _check_closed(self):
 
118
 
            raise ValueError("File is closed.")
 
122
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
129
 
        self._file_base.done = True
 
132
 
    closed = property(lambda x: x._closed)
 
135
 
        """No-op for standard compliance.
 
136
 
        >>> f = IterableFile([])
 
139
 
        Traceback (most recent call last):
 
140
 
        ValueError: File is closed.
 
145
 
        """Implementation of the iterator protocol's next()
 
147
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
152
 
        Traceback (most recent call last):
 
153
 
        ValueError: File is closed.
 
154
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
160
 
        Traceback (most recent call last):
 
164
 
        return self._iter.next()
 
168
 
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
169
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
170
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
173
 
        Traceback (most recent call last):
 
174
 
        ValueError: File is closed.
 
178
 
    def read(self, length=None):
 
180
 
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
182
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
185
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
188
 
        Traceback (most recent call last):
 
189
 
        ValueError: File is closed.
 
193
 
            return self._file_base.read_all()
 
195
 
            return self._file_base.read_n(length)
 
197
 
    def read_to(self, sequence, size=None):
 
199
 
        Read characters until a sequence is found, with optional max size.
 
200
 
        The specified sequence, if found, will be included in the result
 
202
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
209
 
        Traceback (most recent call last):
 
210
 
        ValueError: File is closed.
 
213
 
        return self._file_base.read_to(sequence, size)
 
215
 
    def readline(self, size=None):
 
217
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
224
 
        Traceback (most recent call last):
 
225
 
        ValueError: File is closed.
 
227
 
        return self.read_to('\n', size)
 
229
 
    def readlines(self, sizehint=None):
 
231
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
233
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
234
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
237
 
        Traceback (most recent call last):
 
238
 
        ValueError: File is closed.
 
242
 
            line = self.readline()
 
247
 
            elif len(line) < sizehint:
 
249
 
                sizehint -= len(line)
 
251
 
                self._file_base.push_back(line)
 
255
 
if __name__ == "__main__":