/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-06 02:13:25 UTC
  • mfrom: (7490.7.21 work)
  • mto: This revision was merged to the branch mainline in revision 7501.
  • Revision ID: jelmer@jelmer.uk-20200506021325-awbmmqu1zyorz7sj
Merge 3.1 branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# along with this program; if not, write to the Free Software
16
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
 
18
 
from __future__ import absolute_import
19
18
 
20
19
class IterableFileBase(object):
21
20
    """Create a file-like object from any iterable"""
28
27
 
29
28
    def read_n(self, length):
30
29
        """
31
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
32
 
        'This is '
 
30
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
 
31
        b'This is '
33
32
        """
34
33
        def test_length(result):
35
34
            if len(result) >= length:
40
39
 
41
40
    def read_to(self, sequence, length=None):
42
41
        """
43
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
44
 
        >>> f.read_to('\\n')
45
 
        'Th\\n'
46
 
        >>> f.read_to('\\n')
47
 
        'is is \\n'
 
42
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
43
        >>> f.read_to(b'\\n')
 
44
        b'Th\\n'
 
45
        >>> f.read_to(b'\\n')
 
46
        b'is is \\n'
48
47
        """
49
48
        def test_contents(result):
50
49
            if length is not None:
51
50
                if len(result) >= length:
52
51
                    return length
53
52
            try:
54
 
                return result.index(sequence)+len(sequence)
 
53
                return result.index(sequence) + len(sequence)
55
54
            except ValueError:
56
55
                return None
57
56
        return self._read(test_contents)
78
77
 
79
78
    def read_all(self):
80
79
        """
81
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
82
 
        'This is a test.'
 
80
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
 
81
        b'This is a test.'
83
82
        """
84
83
        def no_stop(result):
85
84
            return None
86
85
        return self._read(no_stop)
87
86
 
88
 
 
89
87
    def push_back(self, contents):
90
88
        """
91
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
92
 
        >>> f.read_to('\\n')
93
 
        'Th\\n'
94
 
        >>> f.push_back("Sh")
 
89
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
90
        >>> f.read_to(b'\\n')
 
91
        b'Th\\n'
 
92
        >>> f.push_back(b"Sh")
95
93
        >>> f.read_all()
96
 
        'Shis is \\na te\\nst.'
 
94
        b'Shis is \\na te\\nst.'
97
95
        """
98
96
        self._buffer = contents + self._buffer
99
97
 
100
98
 
101
99
class IterableFile(object):
102
100
    """This class supplies all File methods that can be implemented cheaply."""
 
101
 
103
102
    def __init__(self, iterable):
104
103
        object.__init__(self)
105
104
        self._file_base = IterableFileBase(iterable)
120
119
 
121
120
    def close(self):
122
121
        """
123
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
122
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
124
123
        >>> f.closed
125
124
        False
126
125
        >>> f.close()
132
131
 
133
132
    closed = property(lambda x: x._closed)
134
133
 
 
134
    def __enter__(self):
 
135
        return self
 
136
 
 
137
    def __exit__(self, exc_type, exc_val, exc_tb):
 
138
        # If there was an error raised, prefer the original one
 
139
        try:
 
140
            self.close()
 
141
        except BaseException:
 
142
            if exc_type is None:
 
143
                raise
 
144
        return False
 
145
 
135
146
    def flush(self):
136
147
        """No-op for standard compliance.
137
148
        >>> f = IterableFile([])
145
156
    def __next__(self):
146
157
        """Implementation of the iterator protocol's next()
147
158
 
148
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
159
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
149
160
        >>> next(f)
150
 
        'This \\n'
 
161
        b'This \\n'
151
162
        >>> f.close()
152
163
        >>> next(f)
153
164
        Traceback (most recent call last):
154
165
        ValueError: File is closed.
155
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
156
 
        >>> next(f)
157
 
        'This \\n'
158
 
        >>> next(f)
159
 
        'is a test.\\n'
 
166
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
 
167
        >>> next(f)
 
168
        b'This \\n'
 
169
        >>> next(f)
 
170
        b'is a test.\\n'
160
171
        >>> next(f)
161
172
        Traceback (most recent call last):
162
173
        StopIteration
168
179
 
169
180
    def __iter__(self):
170
181
        """
171
 
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
172
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
173
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
182
        >>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
 
183
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
 
184
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
174
185
        >>> f.close()
175
186
        >>> list(f)
176
187
        Traceback (most recent call last):
180
191
 
181
192
    def read(self, length=None):
182
193
        """
183
 
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
184
 
        'This is a test.'
185
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
194
        >>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
 
195
        b'This is a test.'
 
196
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
186
197
        >>> f.read(10)
187
 
        'This is a '
188
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
198
        b'This is a '
 
199
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
189
200
        >>> f.close()
190
201
        >>> f.read(10)
191
202
        Traceback (most recent call last):
202
213
        Read characters until a sequence is found, with optional max size.
203
214
        The specified sequence, if found, will be included in the result
204
215
 
205
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
206
 
        >>> f.read_to('i')
207
 
        'Th\\ni'
208
 
        >>> f.read_to('i')
209
 
        's i'
 
216
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
217
        >>> f.read_to(b'i')
 
218
        b'Th\\ni'
 
219
        >>> f.read_to(b'i')
 
220
        b's i'
210
221
        >>> f.close()
211
 
        >>> f.read_to('i')
 
222
        >>> f.read_to(b'i')
212
223
        Traceback (most recent call last):
213
224
        ValueError: File is closed.
214
225
        """
217
228
 
218
229
    def readline(self, size=None):
219
230
        """
220
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
231
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
221
232
        >>> f.readline()
222
 
        'Th\\n'
 
233
        b'Th\\n'
223
234
        >>> f.readline(4)
224
 
        'is i'
 
235
        b'is i'
225
236
        >>> f.close()
226
237
        >>> f.readline()
227
238
        Traceback (most recent call last):
231
242
 
232
243
    def readlines(self, sizehint=None):
233
244
        """
234
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
245
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
235
246
        >>> f.readlines()
236
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
237
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
247
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
 
248
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
238
249
        >>> f.close()
239
250
        >>> f.readlines()
240
251
        Traceback (most recent call last):