/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Jelmer Vernooij
  • Date: 2018-06-14 17:59:16 UTC
  • mto: This revision was merged to the branch mainline in revision 7065.
  • Revision ID: jelmer@jelmer.uk-20180614175916-a2e2xh5k533guq1x
Move breezy.plugins.git to breezy.git.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# along with this program; if not, write to the Free Software
16
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
 
 
18
from __future__ import absolute_import
18
19
 
19
20
class IterableFileBase(object):
20
21
    """Create a file-like object from any iterable"""
27
28
 
28
29
    def read_n(self, length):
29
30
        """
30
 
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
31
 
        b'This is '
 
31
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
32
        'This is '
32
33
        """
33
34
        def test_length(result):
34
35
            if len(result) >= length:
39
40
 
40
41
    def read_to(self, sequence, length=None):
41
42
        """
42
 
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
43
 
        >>> f.read_to(b'\\n')
44
 
        b'Th\\n'
45
 
        >>> f.read_to(b'\\n')
46
 
        b'is is \\n'
 
43
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
44
        >>> f.read_to('\\n')
 
45
        'Th\\n'
 
46
        >>> f.read_to('\\n')
 
47
        'is is \\n'
47
48
        """
48
49
        def test_contents(result):
49
50
            if length is not None:
50
51
                if len(result) >= length:
51
52
                    return length
52
53
            try:
53
 
                return result.index(sequence) + len(sequence)
 
54
                return result.index(sequence)+len(sequence)
54
55
            except ValueError:
55
56
                return None
56
57
        return self._read(test_contents)
77
78
 
78
79
    def read_all(self):
79
80
        """
80
 
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
81
 
        b'This is a test.'
 
81
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
82
        'This is a test.'
82
83
        """
83
84
        def no_stop(result):
84
85
            return None
85
86
        return self._read(no_stop)
86
87
 
 
88
 
87
89
    def push_back(self, contents):
88
90
        """
89
 
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
90
 
        >>> f.read_to(b'\\n')
91
 
        b'Th\\n'
92
 
        >>> f.push_back(b"Sh")
 
91
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
92
        >>> f.read_to('\\n')
 
93
        'Th\\n'
 
94
        >>> f.push_back("Sh")
93
95
        >>> f.read_all()
94
 
        b'Shis is \\na te\\nst.'
 
96
        'Shis is \\na te\\nst.'
95
97
        """
96
98
        self._buffer = contents + self._buffer
97
99
 
98
100
 
99
101
class IterableFile(object):
100
102
    """This class supplies all File methods that can be implemented cheaply."""
101
 
 
102
103
    def __init__(self, iterable):
103
104
        object.__init__(self)
104
105
        self._file_base = IterableFileBase(iterable)
119
120
 
120
121
    def close(self):
121
122
        """
122
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
123
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
123
124
        >>> f.closed
124
125
        False
125
126
        >>> f.close()
138
139
        # If there was an error raised, prefer the original one
139
140
        try:
140
141
            self.close()
141
 
        except BaseException:
 
142
        except:
142
143
            if exc_type is None:
143
144
                raise
144
145
        return False
156
157
    def __next__(self):
157
158
        """Implementation of the iterator protocol's next()
158
159
 
159
 
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
 
160
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
160
161
        >>> next(f)
161
 
        b'This \\n'
 
162
        'This \\n'
162
163
        >>> f.close()
163
164
        >>> next(f)
164
165
        Traceback (most recent call last):
165
166
        ValueError: File is closed.
166
 
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
167
 
        >>> next(f)
168
 
        b'This \\n'
169
 
        >>> next(f)
170
 
        b'is a test.\\n'
 
167
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
168
        >>> next(f)
 
169
        'This \\n'
 
170
        >>> next(f)
 
171
        'is a test.\\n'
171
172
        >>> next(f)
172
173
        Traceback (most recent call last):
173
174
        StopIteration
179
180
 
180
181
    def __iter__(self):
181
182
        """
182
 
        >>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
 
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
183
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
184
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
185
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
185
186
        >>> f.close()
186
187
        >>> list(f)
187
188
        Traceback (most recent call last):
191
192
 
192
193
    def read(self, length=None):
193
194
        """
194
 
        >>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
195
 
        b'This is a test.'
196
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
195
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
196
        'This is a test.'
 
197
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
197
198
        >>> f.read(10)
198
 
        b'This is a '
199
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
199
        'This is a '
 
200
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
200
201
        >>> f.close()
201
202
        >>> f.read(10)
202
203
        Traceback (most recent call last):
213
214
        Read characters until a sequence is found, with optional max size.
214
215
        The specified sequence, if found, will be included in the result
215
216
 
216
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
217
 
        >>> f.read_to(b'i')
218
 
        b'Th\\ni'
219
 
        >>> f.read_to(b'i')
220
 
        b's i'
 
217
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
218
        >>> f.read_to('i')
 
219
        'Th\\ni'
 
220
        >>> f.read_to('i')
 
221
        's i'
221
222
        >>> f.close()
222
 
        >>> f.read_to(b'i')
 
223
        >>> f.read_to('i')
223
224
        Traceback (most recent call last):
224
225
        ValueError: File is closed.
225
226
        """
228
229
 
229
230
    def readline(self, size=None):
230
231
        """
231
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
232
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
232
233
        >>> f.readline()
233
 
        b'Th\\n'
 
234
        'Th\\n'
234
235
        >>> f.readline(4)
235
 
        b'is i'
 
236
        'is i'
236
237
        >>> f.close()
237
238
        >>> f.readline()
238
239
        Traceback (most recent call last):
242
243
 
243
244
    def readlines(self, sizehint=None):
244
245
        """
245
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
246
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
246
247
        >>> f.readlines()
247
 
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
248
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
249
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
249
250
        >>> f.close()
250
251
        >>> f.readlines()
251
252
        Traceback (most recent call last):