/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Robert Collins
  • Date: 2005-10-19 10:11:57 UTC
  • mfrom: (1185.16.78)
  • mto: This revision was merged to the branch mainline in revision 1470.
  • Revision ID: robertc@robertcollins.net-20051019101157-17438d311e746b4f
mergeĀ fromĀ upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
 
# <aaron.bentley@utoronto.ca>
3
 
#
4
 
# This program is free software; you can redistribute it and/or modify
5
 
# it under the terms of the GNU General Public License as published by
6
 
# the Free Software Foundation; either version 2 of the License, or
7
 
# (at your option) any later version.
8
 
#
9
 
# This program is distributed in the hope that it will be useful,
10
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
# GNU General Public License for more details.
13
 
#
14
 
# You should have received a copy of the GNU General Public License
15
 
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
 
 
18
 
from __future__ import absolute_import
19
 
 
20
 
 
21
 
class IterableFileBase(object):
22
 
    """Create a file-like object from any iterable"""
23
 
 
24
 
    def __init__(self, iterable):
25
 
        object.__init__(self)
26
 
        self._iter = iterable.__iter__()
27
 
        self._buffer = b""
28
 
        self.done = False
29
 
 
30
 
    def read_n(self, length):
31
 
        """
32
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
33
 
        'This is '
34
 
        """
35
 
        def test_length(result):
36
 
            if len(result) >= length:
37
 
                return length
38
 
            else:
39
 
                return None
40
 
        return self._read(test_length)
41
 
 
42
 
    def read_to(self, sequence, length=None):
43
 
        """
44
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
45
 
        >>> f.read_to('\\n')
46
 
        'Th\\n'
47
 
        >>> f.read_to('\\n')
48
 
        'is is \\n'
49
 
        """
50
 
        def test_contents(result):
51
 
            if length is not None:
52
 
                if len(result) >= length:
53
 
                    return length
54
 
            try:
55
 
                return result.index(sequence) + len(sequence)
56
 
            except ValueError:
57
 
                return None
58
 
        return self._read(test_contents)
59
 
 
60
 
    def _read(self, result_length):
61
 
        """
62
 
        Read data until result satisfies the condition result_length.
63
 
        result_length is a callable that returns None until the condition
64
 
        is satisfied, and returns the length of the result to use when
65
 
        the condition is satisfied.  (i.e. it returns the length of the
66
 
        subset of the first condition match.)
67
 
        """
68
 
        result = self._buffer
69
 
        while result_length(result) is None:
70
 
            try:
71
 
                result += next(self._iter)
72
 
            except StopIteration:
73
 
                self.done = True
74
 
                self._buffer = b""
75
 
                return result
76
 
        output_length = result_length(result)
77
 
        self._buffer = result[output_length:]
78
 
        return result[:output_length]
79
 
 
80
 
    def read_all(self):
81
 
        """
82
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
83
 
        'This is a test.'
84
 
        """
85
 
        def no_stop(result):
86
 
            return None
87
 
        return self._read(no_stop)
88
 
 
89
 
    def push_back(self, contents):
90
 
        """
91
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
92
 
        >>> f.read_to('\\n')
93
 
        'Th\\n'
94
 
        >>> f.push_back("Sh")
95
 
        >>> f.read_all()
96
 
        'Shis is \\na te\\nst.'
97
 
        """
98
 
        self._buffer = contents + self._buffer
99
 
 
100
 
 
101
 
class IterableFile(object):
102
 
    """This class supplies all File methods that can be implemented cheaply."""
103
 
 
104
 
    def __init__(self, iterable):
105
 
        object.__init__(self)
106
 
        self._file_base = IterableFileBase(iterable)
107
 
        self._iter = self._make_iterator()
108
 
        self._closed = False
109
 
        self.softspace = 0
110
 
 
111
 
    def _make_iterator(self):
112
 
        while not self._file_base.done:
113
 
            self._check_closed()
114
 
            result = self._file_base.read_to(b'\n')
115
 
            if result != b'':
116
 
                yield result
117
 
 
118
 
    def _check_closed(self):
119
 
        if self.closed:
120
 
            raise ValueError("File is closed.")
121
 
 
122
 
    def close(self):
123
 
        """
124
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
125
 
        >>> f.closed
126
 
        False
127
 
        >>> f.close()
128
 
        >>> f.closed
129
 
        True
130
 
        """
131
 
        self._file_base.done = True
132
 
        self._closed = True
133
 
 
134
 
    closed = property(lambda x: x._closed)
135
 
 
136
 
    def __enter__(self):
137
 
        return self
138
 
 
139
 
    def __exit__(self, exc_type, exc_val, exc_tb):
140
 
        # If there was an error raised, prefer the original one
141
 
        try:
142
 
            self.close()
143
 
        except BaseException:
144
 
            if exc_type is None:
145
 
                raise
146
 
        return False
147
 
 
148
 
    def flush(self):
149
 
        """No-op for standard compliance.
150
 
        >>> f = IterableFile([])
151
 
        >>> f.close()
152
 
        >>> f.flush()
153
 
        Traceback (most recent call last):
154
 
        ValueError: File is closed.
155
 
        """
156
 
        self._check_closed()
157
 
 
158
 
    def __next__(self):
159
 
        """Implementation of the iterator protocol's next()
160
 
 
161
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
162
 
        >>> next(f)
163
 
        'This \\n'
164
 
        >>> f.close()
165
 
        >>> next(f)
166
 
        Traceback (most recent call last):
167
 
        ValueError: File is closed.
168
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
169
 
        >>> next(f)
170
 
        'This \\n'
171
 
        >>> next(f)
172
 
        'is a test.\\n'
173
 
        >>> next(f)
174
 
        Traceback (most recent call last):
175
 
        StopIteration
176
 
        """
177
 
        self._check_closed()
178
 
        return next(self._iter)
179
 
 
180
 
    next = __next__
181
 
 
182
 
    def __iter__(self):
183
 
        """
184
 
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
185
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
186
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
187
 
        >>> f.close()
188
 
        >>> list(f)
189
 
        Traceback (most recent call last):
190
 
        ValueError: File is closed.
191
 
        """
192
 
        return self
193
 
 
194
 
    def read(self, length=None):
195
 
        """
196
 
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
197
 
        'This is a test.'
198
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
199
 
        >>> f.read(10)
200
 
        'This is a '
201
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
202
 
        >>> f.close()
203
 
        >>> f.read(10)
204
 
        Traceback (most recent call last):
205
 
        ValueError: File is closed.
206
 
        """
207
 
        self._check_closed()
208
 
        if length is None:
209
 
            return self._file_base.read_all()
210
 
        else:
211
 
            return self._file_base.read_n(length)
212
 
 
213
 
    def read_to(self, sequence, size=None):
214
 
        """
215
 
        Read characters until a sequence is found, with optional max size.
216
 
        The specified sequence, if found, will be included in the result
217
 
 
218
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
219
 
        >>> f.read_to('i')
220
 
        'Th\\ni'
221
 
        >>> f.read_to('i')
222
 
        's i'
223
 
        >>> f.close()
224
 
        >>> f.read_to('i')
225
 
        Traceback (most recent call last):
226
 
        ValueError: File is closed.
227
 
        """
228
 
        self._check_closed()
229
 
        return self._file_base.read_to(sequence, size)
230
 
 
231
 
    def readline(self, size=None):
232
 
        """
233
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
234
 
        >>> f.readline()
235
 
        'Th\\n'
236
 
        >>> f.readline(4)
237
 
        'is i'
238
 
        >>> f.close()
239
 
        >>> f.readline()
240
 
        Traceback (most recent call last):
241
 
        ValueError: File is closed.
242
 
        """
243
 
        return self.read_to(b'\n', size)
244
 
 
245
 
    def readlines(self, sizehint=None):
246
 
        """
247
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
248
 
        >>> f.readlines()
249
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
250
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
251
 
        >>> f.close()
252
 
        >>> f.readlines()
253
 
        Traceback (most recent call last):
254
 
        ValueError: File is closed.
255
 
        """
256
 
        lines = []
257
 
        while True:
258
 
            line = self.readline()
259
 
            if line == b"":
260
 
                return lines
261
 
            if sizehint is None:
262
 
                lines.append(line)
263
 
            elif len(line) < sizehint:
264
 
                lines.append(line)
265
 
                sizehint -= len(line)
266
 
            else:
267
 
                self._file_base.push_back(line)
268
 
                return lines
269
 
 
270
 
 
271
 
if __name__ == "__main__":
272
 
    import doctest
273
 
    doctest.testmod()