/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
0.5.92 by Aaron Bentley
added IterableFile class
1
# Copyright (C) 2005 Aaron Bentley
2
# <aaron.bentley@utoronto.ca>
3
#
4
#    This program is free software; you can redistribute it and/or modify
5
#    it under the terms of the GNU General Public License as published by
6
#    the Free Software Foundation; either version 2 of the License, or
7
#    (at your option) any later version.
8
#
9
#    This program is distributed in the hope that it will be useful,
10
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
#    GNU General Public License for more details.
13
#
14
#    You should have received a copy of the GNU General Public License
15
#    along with this program; if not, write to the Free Software
16
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
18
import doctest
19
20
class IterableFileBase(object):
21
    """Create a file-like object from any iterable"""
22
    def __init__(self, iterable):
23
        object.__init__(self)
24
        self._iter = iterable.__iter__()
25
        self._buffer = ""
26
        self.done = False
27
28
    def read_n(self, length):
29
        """
30
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
31
        'This is '
32
        """
33
        def test_length(result):
34
            if len(result) >= length:
35
                return length
36
            else:
37
                return None
38
        return self._read(test_length)
39
40
    def read_to(self, sequence, length=None):
41
        """
42
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
43
        >>> f.read_to('\\n')
44
        'Th\\n'
45
        >>> f.read_to('\\n')
46
        'is is \\n'
47
        """
48
        def test_contents(result):
49
            if length is not None:
50
                if len(result) >= length:
51
                    return length
52
            try:
53
                return result.index(sequence)+len(sequence)
54
            except ValueError:
55
                return None
56
        return self._read(test_contents)
57
58
    def _read(self, result_length):
59
        """
60
        Read data until result satisfies the condition result_length.
61
        result_length is a callable that returns None until the condition
62
        is satisfied, and returns the length of the result to use when
63
        the condition is satisfied.  (i.e. it returns the length of the
64
        subset of the first condition match.)
65
        """
66
        result = self._buffer
67
        while result_length(result) is None:
68
            try:
69
                result += self._iter.next()
70
            except StopIteration:
71
                self.done = True
72
                self._buffer = ""
73
                return result
74
        output_length = result_length(result)
75
        self._buffer = result[output_length:]
76
        return result[:output_length]
77
78
    def read_all(self):
79
        """
80
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
81
        'This is a test.'
82
        """
83
        def no_stop(result):
84
            return None
85
        return self._read(no_stop)
86
87
88
    def push_back(self, contents):
89
        """
90
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
91
        >>> f.read_to('\\n')
92
        'Th\\n'
93
        >>> f.push_back("Sh")
94
        >>> f.read_all()
95
        'Shis is \\na te\\nst.'
96
        """
97
        self._buffer = contents + self._buffer
98
99
100
class IterableFile(object):
101
    """This class supplies all File methods that can be implemented cheaply."""
102
    def __init__(self, iterable):
103
        object.__init__(self)
104
        self._file_base = IterableFileBase(iterable)
105
        self._iter = self._make_iterator()
106
        self._closed = False
107
        self.softspace = 0
108
109
    def _make_iterator(self):
110
        while not self._file_base.done:
111
            self._check_closed()
112
            yield self._file_base.read_to('\n')
113
114
    def _check_closed(self):
115
        if self.closed:
116
            raise ValueError("File is closed.")
117
118
    def close(self):
119
        """
120
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
121
        >>> f.closed
122
        False
123
        >>> f.close()
124
        >>> f.closed
125
        True
126
        """
127
        self._file_base.done = True
128
        self._closed = True
129
130
    closed = property(lambda x: x._closed)
131
132
    def flush(self):
133
        """No-op for standard compliance.
134
        >>> f = IterableFile([])
135
        >>> f.close()
136
        >>> f.flush()
137
        Traceback (most recent call last):
138
        ValueError: File is closed.
139
        """
140
        self._check_closed()
141
142
    def next(self):
143
        """Implementation of the iterator protocol's next()
144
145
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
146
        >>> f.next()
147
        'This \\n'
148
        >>> f.close()
149
        >>> f.next()
150
        Traceback (most recent call last):
151
        ValueError: File is closed.
152
        """
153
        self._check_closed()
154
        return self._iter.next()
155
156
    def __iter__(self):
157
        """
158
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
159
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
160
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
161
        >>> f.close()
162
        >>> list(f)
163
        Traceback (most recent call last):
164
        ValueError: File is closed.
165
        """
166
        return self
167
168
    def read(self, length=None):
169
        """
170
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
171
        'This is a test.'
172
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
173
        >>> f.read(10)
174
        'This is a '
175
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
176
        >>> f.close()
177
        >>> f.read(10)
178
        Traceback (most recent call last):
179
        ValueError: File is closed.
180
        """
181
        self._check_closed()
182
        if length is None:
183
            return self._file_base.read_all()
184
        else:
185
            return self._file_base.read_n(length)
186
187
    def read_to(self, sequence, size=None):
188
        """
189
        Read characters until a sequence is found, with optional max size.
190
        The specified sequence, if found, will be included in the result
191
192
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
193
        >>> f.read_to('i')
194
        'Th\\ni'
195
        >>> f.read_to('i')
196
        's i'
197
        >>> f.close()
198
        >>> f.read_to('i')
199
        Traceback (most recent call last):
200
        ValueError: File is closed.
201
        """
202
        self._check_closed()
203
        return self._file_base.read_to(sequence, size)
204
205
    def readline(self, size=None):
206
        """
207
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
208
        >>> f.readline()
209
        'Th\\n'
210
        >>> f.readline(4)
211
        'is i'
212
        >>> f.close()
213
        >>> f.readline()
214
        Traceback (most recent call last):
215
        ValueError: File is closed.
216
        """
217
        return self.read_to('\n', size)
218
219
    def readlines(self, sizehint=None):
220
        """
221
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
222
        >>> f.readlines()
223
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
224
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
225
        >>> f.close()
226
        >>> f.readlines()
227
        Traceback (most recent call last):
228
        ValueError: File is closed.
229
        """
230
        lines = []
231
        while True:
232
            line = self.readline()
233
            if line == "":
234
                return lines
235
            if sizehint is None:
236
                lines.append(line)
237
            elif len(line) < sizehint:
238
                lines.append(line)
239
                sizehint -= len(line)
240
            else:
241
                self._file_base.push_back(line)
242
                return lines
243
244
        
245
if __name__ == "__main__":
246
    doctest.testmod()