/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Robert Collins
  • Date: 2005-12-24 02:20:45 UTC
  • mto: (1185.50.57 bzr-jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1550.
  • Revision ID: robertc@robertcollins.net-20051224022045-14efc8dfa0e1a4e9
Start tests for api usage.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
 
# <aaron.bentley@utoronto.ca>
3
 
#
4
 
# This program is free software; you can redistribute it and/or modify
5
 
# it under the terms of the GNU General Public License as published by
6
 
# the Free Software Foundation; either version 2 of the License, or
7
 
# (at your option) any later version.
8
 
#
9
 
# This program is distributed in the hope that it will be useful,
10
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
# GNU General Public License for more details.
13
 
#
14
 
# You should have received a copy of the GNU General Public License
15
 
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
 
 
18
 
 
19
 
class IterableFileBase(object):
20
 
    """Create a file-like object from any iterable"""
21
 
 
22
 
    def __init__(self, iterable):
23
 
        object.__init__(self)
24
 
        self._iter = iterable.__iter__()
25
 
        self._buffer = b""
26
 
        self.done = False
27
 
 
28
 
    def read_n(self, length):
29
 
        """
30
 
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
31
 
        b'This is '
32
 
        """
33
 
        def test_length(result):
34
 
            if len(result) >= length:
35
 
                return length
36
 
            else:
37
 
                return None
38
 
        return self._read(test_length)
39
 
 
40
 
    def read_to(self, sequence, length=None):
41
 
        """
42
 
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
43
 
        >>> f.read_to(b'\\n')
44
 
        b'Th\\n'
45
 
        >>> f.read_to(b'\\n')
46
 
        b'is is \\n'
47
 
        """
48
 
        def test_contents(result):
49
 
            if length is not None:
50
 
                if len(result) >= length:
51
 
                    return length
52
 
            try:
53
 
                return result.index(sequence) + len(sequence)
54
 
            except ValueError:
55
 
                return None
56
 
        return self._read(test_contents)
57
 
 
58
 
    def _read(self, result_length):
59
 
        """
60
 
        Read data until result satisfies the condition result_length.
61
 
        result_length is a callable that returns None until the condition
62
 
        is satisfied, and returns the length of the result to use when
63
 
        the condition is satisfied.  (i.e. it returns the length of the
64
 
        subset of the first condition match.)
65
 
        """
66
 
        result = self._buffer
67
 
        while result_length(result) is None:
68
 
            try:
69
 
                result += next(self._iter)
70
 
            except StopIteration:
71
 
                self.done = True
72
 
                self._buffer = b""
73
 
                return result
74
 
        output_length = result_length(result)
75
 
        self._buffer = result[output_length:]
76
 
        return result[:output_length]
77
 
 
78
 
    def read_all(self):
79
 
        """
80
 
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
81
 
        b'This is a test.'
82
 
        """
83
 
        def no_stop(result):
84
 
            return None
85
 
        return self._read(no_stop)
86
 
 
87
 
    def push_back(self, contents):
88
 
        """
89
 
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
90
 
        >>> f.read_to(b'\\n')
91
 
        b'Th\\n'
92
 
        >>> f.push_back(b"Sh")
93
 
        >>> f.read_all()
94
 
        b'Shis is \\na te\\nst.'
95
 
        """
96
 
        self._buffer = contents + self._buffer
97
 
 
98
 
 
99
 
class IterableFile(object):
100
 
    """This class supplies all File methods that can be implemented cheaply."""
101
 
 
102
 
    def __init__(self, iterable):
103
 
        object.__init__(self)
104
 
        self._file_base = IterableFileBase(iterable)
105
 
        self._iter = self._make_iterator()
106
 
        self._closed = False
107
 
        self.softspace = 0
108
 
 
109
 
    def _make_iterator(self):
110
 
        while not self._file_base.done:
111
 
            self._check_closed()
112
 
            result = self._file_base.read_to(b'\n')
113
 
            if result != b'':
114
 
                yield result
115
 
 
116
 
    def _check_closed(self):
117
 
        if self.closed:
118
 
            raise ValueError("File is closed.")
119
 
 
120
 
    def close(self):
121
 
        """
122
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
123
 
        >>> f.closed
124
 
        False
125
 
        >>> f.close()
126
 
        >>> f.closed
127
 
        True
128
 
        """
129
 
        self._file_base.done = True
130
 
        self._closed = True
131
 
 
132
 
    closed = property(lambda x: x._closed)
133
 
 
134
 
    def __enter__(self):
135
 
        return self
136
 
 
137
 
    def __exit__(self, exc_type, exc_val, exc_tb):
138
 
        # If there was an error raised, prefer the original one
139
 
        try:
140
 
            self.close()
141
 
        except BaseException:
142
 
            if exc_type is None:
143
 
                raise
144
 
        return False
145
 
 
146
 
    def flush(self):
147
 
        """No-op for standard compliance.
148
 
        >>> f = IterableFile([])
149
 
        >>> f.close()
150
 
        >>> f.flush()
151
 
        Traceback (most recent call last):
152
 
        ValueError: File is closed.
153
 
        """
154
 
        self._check_closed()
155
 
 
156
 
    def __next__(self):
157
 
        """Implementation of the iterator protocol's next()
158
 
 
159
 
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
160
 
        >>> next(f)
161
 
        b'This \\n'
162
 
        >>> f.close()
163
 
        >>> next(f)
164
 
        Traceback (most recent call last):
165
 
        ValueError: File is closed.
166
 
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
167
 
        >>> next(f)
168
 
        b'This \\n'
169
 
        >>> next(f)
170
 
        b'is a test.\\n'
171
 
        >>> next(f)
172
 
        Traceback (most recent call last):
173
 
        StopIteration
174
 
        """
175
 
        self._check_closed()
176
 
        return next(self._iter)
177
 
 
178
 
    next = __next__
179
 
 
180
 
    def __iter__(self):
181
 
        """
182
 
        >>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
 
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
185
 
        >>> f.close()
186
 
        >>> list(f)
187
 
        Traceback (most recent call last):
188
 
        ValueError: File is closed.
189
 
        """
190
 
        return self
191
 
 
192
 
    def read(self, length=None):
193
 
        """
194
 
        >>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
195
 
        b'This is a test.'
196
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
197
 
        >>> f.read(10)
198
 
        b'This is a '
199
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
200
 
        >>> f.close()
201
 
        >>> f.read(10)
202
 
        Traceback (most recent call last):
203
 
        ValueError: File is closed.
204
 
        """
205
 
        self._check_closed()
206
 
        if length is None:
207
 
            return self._file_base.read_all()
208
 
        else:
209
 
            return self._file_base.read_n(length)
210
 
 
211
 
    def read_to(self, sequence, size=None):
212
 
        """
213
 
        Read characters until a sequence is found, with optional max size.
214
 
        The specified sequence, if found, will be included in the result
215
 
 
216
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
217
 
        >>> f.read_to(b'i')
218
 
        b'Th\\ni'
219
 
        >>> f.read_to(b'i')
220
 
        b's i'
221
 
        >>> f.close()
222
 
        >>> f.read_to(b'i')
223
 
        Traceback (most recent call last):
224
 
        ValueError: File is closed.
225
 
        """
226
 
        self._check_closed()
227
 
        return self._file_base.read_to(sequence, size)
228
 
 
229
 
    def readline(self, size=None):
230
 
        """
231
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
232
 
        >>> f.readline()
233
 
        b'Th\\n'
234
 
        >>> f.readline(4)
235
 
        b'is i'
236
 
        >>> f.close()
237
 
        >>> f.readline()
238
 
        Traceback (most recent call last):
239
 
        ValueError: File is closed.
240
 
        """
241
 
        return self.read_to(b'\n', size)
242
 
 
243
 
    def readlines(self, sizehint=None):
244
 
        """
245
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
246
 
        >>> f.readlines()
247
 
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
249
 
        >>> f.close()
250
 
        >>> f.readlines()
251
 
        Traceback (most recent call last):
252
 
        ValueError: File is closed.
253
 
        """
254
 
        lines = []
255
 
        while True:
256
 
            line = self.readline()
257
 
            if line == b"":
258
 
                return lines
259
 
            if sizehint is None:
260
 
                lines.append(line)
261
 
            elif len(line) < sizehint:
262
 
                lines.append(line)
263
 
                sizehint -= len(line)
264
 
            else:
265
 
                self._file_base.push_back(line)
266
 
                return lines
267
 
 
268
 
 
269
 
if __name__ == "__main__":
270
 
    import doctest
271
 
    doctest.testmod()