/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Jelmer Vernooij
  • Date: 2018-02-18 21:42:57 UTC
  • mto: This revision was merged to the branch mainline in revision 6859.
  • Revision ID: jelmer@jelmer.uk-20180218214257-jpevutp1wa30tz3v
Update TODO to reference Breezy, not Bazaar.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# along with this program; if not, write to the Free Software
16
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
 
 
18
from __future__ import absolute_import
18
19
 
19
20
class IterableFileBase(object):
20
21
    """Create a file-like object from any iterable"""
27
28
 
28
29
    def read_n(self, length):
29
30
        """
30
 
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
31
 
        b'This is '
 
31
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
32
        'This is '
32
33
        """
33
34
        def test_length(result):
34
35
            if len(result) >= length:
39
40
 
40
41
    def read_to(self, sequence, length=None):
41
42
        """
42
 
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
43
 
        >>> f.read_to(b'\\n')
44
 
        b'Th\\n'
45
 
        >>> f.read_to(b'\\n')
46
 
        b'is is \\n'
 
43
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
44
        >>> f.read_to('\\n')
 
45
        'Th\\n'
 
46
        >>> f.read_to('\\n')
 
47
        'is is \\n'
47
48
        """
48
49
        def test_contents(result):
49
50
            if length is not None:
50
51
                if len(result) >= length:
51
52
                    return length
52
53
            try:
53
 
                return result.index(sequence) + len(sequence)
 
54
                return result.index(sequence)+len(sequence)
54
55
            except ValueError:
55
56
                return None
56
57
        return self._read(test_contents)
77
78
 
78
79
    def read_all(self):
79
80
        """
80
 
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
81
 
        b'This is a test.'
 
81
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
82
        'This is a test.'
82
83
        """
83
84
        def no_stop(result):
84
85
            return None
85
86
        return self._read(no_stop)
86
87
 
 
88
 
87
89
    def push_back(self, contents):
88
90
        """
89
 
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
90
 
        >>> f.read_to(b'\\n')
91
 
        b'Th\\n'
92
 
        >>> f.push_back(b"Sh")
 
91
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
92
        >>> f.read_to('\\n')
 
93
        'Th\\n'
 
94
        >>> f.push_back("Sh")
93
95
        >>> f.read_all()
94
 
        b'Shis is \\na te\\nst.'
 
96
        'Shis is \\na te\\nst.'
95
97
        """
96
98
        self._buffer = contents + self._buffer
97
99
 
98
100
 
99
101
class IterableFile(object):
100
102
    """This class supplies all File methods that can be implemented cheaply."""
101
 
 
102
103
    def __init__(self, iterable):
103
104
        object.__init__(self)
104
105
        self._file_base = IterableFileBase(iterable)
119
120
 
120
121
    def close(self):
121
122
        """
122
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
123
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
123
124
        >>> f.closed
124
125
        False
125
126
        >>> f.close()
131
132
 
132
133
    closed = property(lambda x: x._closed)
133
134
 
134
 
    def __enter__(self):
135
 
        return self
136
 
 
137
 
    def __exit__(self, exc_type, exc_val, exc_tb):
138
 
        # If there was an error raised, prefer the original one
139
 
        try:
140
 
            self.close()
141
 
        except BaseException:
142
 
            if exc_type is None:
143
 
                raise
144
 
        return False
145
 
 
146
135
    def flush(self):
147
136
        """No-op for standard compliance.
148
137
        >>> f = IterableFile([])
156
145
    def __next__(self):
157
146
        """Implementation of the iterator protocol's next()
158
147
 
159
 
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
 
148
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
160
149
        >>> next(f)
161
 
        b'This \\n'
 
150
        'This \\n'
162
151
        >>> f.close()
163
152
        >>> next(f)
164
153
        Traceback (most recent call last):
165
154
        ValueError: File is closed.
166
 
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
167
 
        >>> next(f)
168
 
        b'This \\n'
169
 
        >>> next(f)
170
 
        b'is a test.\\n'
 
155
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
156
        >>> next(f)
 
157
        'This \\n'
 
158
        >>> next(f)
 
159
        'is a test.\\n'
171
160
        >>> next(f)
172
161
        Traceback (most recent call last):
173
162
        StopIteration
179
168
 
180
169
    def __iter__(self):
181
170
        """
182
 
        >>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
 
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
171
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
172
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
173
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
185
174
        >>> f.close()
186
175
        >>> list(f)
187
176
        Traceback (most recent call last):
191
180
 
192
181
    def read(self, length=None):
193
182
        """
194
 
        >>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
195
 
        b'This is a test.'
196
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
183
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
184
        'This is a test.'
 
185
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
197
186
        >>> f.read(10)
198
 
        b'This is a '
199
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
187
        'This is a '
 
188
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
200
189
        >>> f.close()
201
190
        >>> f.read(10)
202
191
        Traceback (most recent call last):
213
202
        Read characters until a sequence is found, with optional max size.
214
203
        The specified sequence, if found, will be included in the result
215
204
 
216
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
217
 
        >>> f.read_to(b'i')
218
 
        b'Th\\ni'
219
 
        >>> f.read_to(b'i')
220
 
        b's i'
 
205
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
206
        >>> f.read_to('i')
 
207
        'Th\\ni'
 
208
        >>> f.read_to('i')
 
209
        's i'
221
210
        >>> f.close()
222
 
        >>> f.read_to(b'i')
 
211
        >>> f.read_to('i')
223
212
        Traceback (most recent call last):
224
213
        ValueError: File is closed.
225
214
        """
228
217
 
229
218
    def readline(self, size=None):
230
219
        """
231
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
220
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
232
221
        >>> f.readline()
233
 
        b'Th\\n'
 
222
        'Th\\n'
234
223
        >>> f.readline(4)
235
 
        b'is i'
 
224
        'is i'
236
225
        >>> f.close()
237
226
        >>> f.readline()
238
227
        Traceback (most recent call last):
242
231
 
243
232
    def readlines(self, sizehint=None):
244
233
        """
245
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
234
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
246
235
        >>> f.readlines()
247
 
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
236
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
237
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
249
238
        >>> f.close()
250
239
        >>> f.readlines()
251
240
        Traceback (most recent call last):