/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Breezy landing bot
  • Author(s): Colin Watson
  • Date: 2020-11-16 21:47:08 UTC
  • mfrom: (7521.1.1 remove-lp-workaround)
  • Revision ID: breezy.the.bot@gmail.com-20201116214708-jos209mgxi41oy15
Remove breezy.git workaround for bazaar.launchpad.net.

Merged from https://code.launchpad.net/~cjwatson/brz/remove-lp-workaround/+merge/393710

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# along with this program; if not, write to the Free Software
16
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
 
18
 
from __future__ import absolute_import
19
18
 
20
19
class IterableFileBase(object):
21
20
    """Create a file-like object from any iterable"""
23
22
    def __init__(self, iterable):
24
23
        object.__init__(self)
25
24
        self._iter = iterable.__iter__()
26
 
        self._buffer = ""
 
25
        self._buffer = b""
27
26
        self.done = False
28
27
 
29
28
    def read_n(self, length):
30
29
        """
31
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
32
 
        'This is '
 
30
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
 
31
        b'This is '
33
32
        """
34
33
        def test_length(result):
35
34
            if len(result) >= length:
40
39
 
41
40
    def read_to(self, sequence, length=None):
42
41
        """
43
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
44
 
        >>> f.read_to('\\n')
45
 
        'Th\\n'
46
 
        >>> f.read_to('\\n')
47
 
        'is is \\n'
 
42
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
43
        >>> f.read_to(b'\\n')
 
44
        b'Th\\n'
 
45
        >>> f.read_to(b'\\n')
 
46
        b'is is \\n'
48
47
        """
49
48
        def test_contents(result):
50
49
            if length is not None:
51
50
                if len(result) >= length:
52
51
                    return length
53
52
            try:
54
 
                return result.index(sequence)+len(sequence)
 
53
                return result.index(sequence) + len(sequence)
55
54
            except ValueError:
56
55
                return None
57
56
        return self._read(test_contents)
67
66
        result = self._buffer
68
67
        while result_length(result) is None:
69
68
            try:
70
 
                result += self._iter.next()
 
69
                result += next(self._iter)
71
70
            except StopIteration:
72
71
                self.done = True
73
 
                self._buffer = ""
 
72
                self._buffer = b""
74
73
                return result
75
74
        output_length = result_length(result)
76
75
        self._buffer = result[output_length:]
78
77
 
79
78
    def read_all(self):
80
79
        """
81
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
82
 
        'This is a test.'
 
80
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
 
81
        b'This is a test.'
83
82
        """
84
83
        def no_stop(result):
85
84
            return None
86
85
        return self._read(no_stop)
87
86
 
88
 
 
89
87
    def push_back(self, contents):
90
88
        """
91
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
92
 
        >>> f.read_to('\\n')
93
 
        'Th\\n'
94
 
        >>> f.push_back("Sh")
 
89
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
90
        >>> f.read_to(b'\\n')
 
91
        b'Th\\n'
 
92
        >>> f.push_back(b"Sh")
95
93
        >>> f.read_all()
96
 
        'Shis is \\na te\\nst.'
 
94
        b'Shis is \\na te\\nst.'
97
95
        """
98
96
        self._buffer = contents + self._buffer
99
97
 
100
98
 
101
99
class IterableFile(object):
102
100
    """This class supplies all File methods that can be implemented cheaply."""
 
101
 
103
102
    def __init__(self, iterable):
104
103
        object.__init__(self)
105
104
        self._file_base = IterableFileBase(iterable)
110
109
    def _make_iterator(self):
111
110
        while not self._file_base.done:
112
111
            self._check_closed()
113
 
            result = self._file_base.read_to('\n')
114
 
            if result != '':
 
112
            result = self._file_base.read_to(b'\n')
 
113
            if result != b'':
115
114
                yield result
116
115
 
117
116
    def _check_closed(self):
120
119
 
121
120
    def close(self):
122
121
        """
123
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
122
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
124
123
        >>> f.closed
125
124
        False
126
125
        >>> f.close()
132
131
 
133
132
    closed = property(lambda x: x._closed)
134
133
 
 
134
    def __enter__(self):
 
135
        return self
 
136
 
 
137
    def __exit__(self, exc_type, exc_val, exc_tb):
 
138
        # If there was an error raised, prefer the original one
 
139
        try:
 
140
            self.close()
 
141
        except BaseException:
 
142
            if exc_type is None:
 
143
                raise
 
144
        return False
 
145
 
135
146
    def flush(self):
136
147
        """No-op for standard compliance.
137
148
        >>> f = IterableFile([])
142
153
        """
143
154
        self._check_closed()
144
155
 
145
 
    def next(self):
 
156
    def __next__(self):
146
157
        """Implementation of the iterator protocol's next()
147
158
 
148
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
149
 
        >>> f.next()
150
 
        'This \\n'
 
159
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
 
160
        >>> next(f)
 
161
        b'This \\n'
151
162
        >>> f.close()
152
 
        >>> f.next()
 
163
        >>> next(f)
153
164
        Traceback (most recent call last):
154
165
        ValueError: File is closed.
155
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
156
 
        >>> f.next()
157
 
        'This \\n'
158
 
        >>> f.next()
159
 
        'is a test.\\n'
160
 
        >>> f.next()
 
166
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
 
167
        >>> next(f)
 
168
        b'This \\n'
 
169
        >>> next(f)
 
170
        b'is a test.\\n'
 
171
        >>> next(f)
161
172
        Traceback (most recent call last):
162
173
        StopIteration
163
174
        """
164
175
        self._check_closed()
165
 
        return self._iter.next()
 
176
        return next(self._iter)
 
177
 
 
178
    next = __next__
166
179
 
167
180
    def __iter__(self):
168
181
        """
169
 
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
170
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
171
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
182
        >>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
 
183
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
 
184
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
172
185
        >>> f.close()
173
186
        >>> list(f)
174
187
        Traceback (most recent call last):
178
191
 
179
192
    def read(self, length=None):
180
193
        """
181
 
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
182
 
        'This is a test.'
183
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
194
        >>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
 
195
        b'This is a test.'
 
196
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
184
197
        >>> f.read(10)
185
 
        'This is a '
186
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
198
        b'This is a '
 
199
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
187
200
        >>> f.close()
188
201
        >>> f.read(10)
189
202
        Traceback (most recent call last):
200
213
        Read characters until a sequence is found, with optional max size.
201
214
        The specified sequence, if found, will be included in the result
202
215
 
203
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
204
 
        >>> f.read_to('i')
205
 
        'Th\\ni'
206
 
        >>> f.read_to('i')
207
 
        's i'
 
216
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
217
        >>> f.read_to(b'i')
 
218
        b'Th\\ni'
 
219
        >>> f.read_to(b'i')
 
220
        b's i'
208
221
        >>> f.close()
209
 
        >>> f.read_to('i')
 
222
        >>> f.read_to(b'i')
210
223
        Traceback (most recent call last):
211
224
        ValueError: File is closed.
212
225
        """
215
228
 
216
229
    def readline(self, size=None):
217
230
        """
218
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
231
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
219
232
        >>> f.readline()
220
 
        'Th\\n'
 
233
        b'Th\\n'
221
234
        >>> f.readline(4)
222
 
        'is i'
 
235
        b'is i'
223
236
        >>> f.close()
224
237
        >>> f.readline()
225
238
        Traceback (most recent call last):
226
239
        ValueError: File is closed.
227
240
        """
228
 
        return self.read_to('\n', size)
 
241
        return self.read_to(b'\n', size)
229
242
 
230
243
    def readlines(self, sizehint=None):
231
244
        """
232
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
245
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
233
246
        >>> f.readlines()
234
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
235
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
247
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
 
248
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
236
249
        >>> f.close()
237
250
        >>> f.readlines()
238
251
        Traceback (most recent call last):
241
254
        lines = []
242
255
        while True:
243
256
            line = self.readline()
244
 
            if line == "":
 
257
            if line == b"":
245
258
                return lines
246
259
            if sizehint is None:
247
260
                lines.append(line)