/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-07 02:14:30 UTC
  • mto: This revision was merged to the branch mainline in revision 7492.
  • Revision ID: jelmer@jelmer.uk-20200207021430-m49iq3x4x8xlib6x
Drop python2 support.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
 
2
# <aaron.bentley@utoronto.ca>
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
17
 
 
18
from __future__ import absolute_import
 
19
 
 
20
 
 
21
class IterableFileBase(object):
 
22
    """Create a file-like object from any iterable"""
 
23
 
 
24
    def __init__(self, iterable):
 
25
        object.__init__(self)
 
26
        self._iter = iterable.__iter__()
 
27
        self._buffer = b""
 
28
        self.done = False
 
29
 
 
30
    def read_n(self, length):
 
31
        """
 
32
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
 
33
        b'This is '
 
34
        """
 
35
        def test_length(result):
 
36
            if len(result) >= length:
 
37
                return length
 
38
            else:
 
39
                return None
 
40
        return self._read(test_length)
 
41
 
 
42
    def read_to(self, sequence, length=None):
 
43
        """
 
44
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
45
        >>> f.read_to(b'\\n')
 
46
        b'Th\\n'
 
47
        >>> f.read_to(b'\\n')
 
48
        b'is is \\n'
 
49
        """
 
50
        def test_contents(result):
 
51
            if length is not None:
 
52
                if len(result) >= length:
 
53
                    return length
 
54
            try:
 
55
                return result.index(sequence) + len(sequence)
 
56
            except ValueError:
 
57
                return None
 
58
        return self._read(test_contents)
 
59
 
 
60
    def _read(self, result_length):
 
61
        """
 
62
        Read data until result satisfies the condition result_length.
 
63
        result_length is a callable that returns None until the condition
 
64
        is satisfied, and returns the length of the result to use when
 
65
        the condition is satisfied.  (i.e. it returns the length of the
 
66
        subset of the first condition match.)
 
67
        """
 
68
        result = self._buffer
 
69
        while result_length(result) is None:
 
70
            try:
 
71
                result += next(self._iter)
 
72
            except StopIteration:
 
73
                self.done = True
 
74
                self._buffer = b""
 
75
                return result
 
76
        output_length = result_length(result)
 
77
        self._buffer = result[output_length:]
 
78
        return result[:output_length]
 
79
 
 
80
    def read_all(self):
 
81
        """
 
82
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
 
83
        b'This is a test.'
 
84
        """
 
85
        def no_stop(result):
 
86
            return None
 
87
        return self._read(no_stop)
 
88
 
 
89
    def push_back(self, contents):
 
90
        """
 
91
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
92
        >>> f.read_to(b'\\n')
 
93
        b'Th\\n'
 
94
        >>> f.push_back(b"Sh")
 
95
        >>> f.read_all()
 
96
        b'Shis is \\na te\\nst.'
 
97
        """
 
98
        self._buffer = contents + self._buffer
 
99
 
 
100
 
 
101
class IterableFile(object):
 
102
    """This class supplies all File methods that can be implemented cheaply."""
 
103
 
 
104
    def __init__(self, iterable):
 
105
        object.__init__(self)
 
106
        self._file_base = IterableFileBase(iterable)
 
107
        self._iter = self._make_iterator()
 
108
        self._closed = False
 
109
        self.softspace = 0
 
110
 
 
111
    def _make_iterator(self):
 
112
        while not self._file_base.done:
 
113
            self._check_closed()
 
114
            result = self._file_base.read_to(b'\n')
 
115
            if result != b'':
 
116
                yield result
 
117
 
 
118
    def _check_closed(self):
 
119
        if self.closed:
 
120
            raise ValueError("File is closed.")
 
121
 
 
122
    def close(self):
 
123
        """
 
124
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
125
        >>> f.closed
 
126
        False
 
127
        >>> f.close()
 
128
        >>> f.closed
 
129
        True
 
130
        """
 
131
        self._file_base.done = True
 
132
        self._closed = True
 
133
 
 
134
    closed = property(lambda x: x._closed)
 
135
 
 
136
    def __enter__(self):
 
137
        return self
 
138
 
 
139
    def __exit__(self, exc_type, exc_val, exc_tb):
 
140
        # If there was an error raised, prefer the original one
 
141
        try:
 
142
            self.close()
 
143
        except BaseException:
 
144
            if exc_type is None:
 
145
                raise
 
146
        return False
 
147
 
 
148
    def flush(self):
 
149
        """No-op for standard compliance.
 
150
        >>> f = IterableFile([])
 
151
        >>> f.close()
 
152
        >>> f.flush()
 
153
        Traceback (most recent call last):
 
154
        ValueError: File is closed.
 
155
        """
 
156
        self._check_closed()
 
157
 
 
158
    def __next__(self):
 
159
        """Implementation of the iterator protocol's next()
 
160
 
 
161
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
 
162
        >>> next(f)
 
163
        b'This \\n'
 
164
        >>> f.close()
 
165
        >>> next(f)
 
166
        Traceback (most recent call last):
 
167
        ValueError: File is closed.
 
168
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
 
169
        >>> next(f)
 
170
        b'This \\n'
 
171
        >>> next(f)
 
172
        b'is a test.\\n'
 
173
        >>> next(f)
 
174
        Traceback (most recent call last):
 
175
        StopIteration
 
176
        """
 
177
        self._check_closed()
 
178
        return next(self._iter)
 
179
 
 
180
    next = __next__
 
181
 
 
182
    def __iter__(self):
 
183
        """
 
184
        >>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
 
185
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
 
186
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
187
        >>> f.close()
 
188
        >>> list(f)
 
189
        Traceback (most recent call last):
 
190
        ValueError: File is closed.
 
191
        """
 
192
        return self
 
193
 
 
194
    def read(self, length=None):
 
195
        """
 
196
        >>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
 
197
        b'This is a test.'
 
198
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
199
        >>> f.read(10)
 
200
        b'This is a '
 
201
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
202
        >>> f.close()
 
203
        >>> f.read(10)
 
204
        Traceback (most recent call last):
 
205
        ValueError: File is closed.
 
206
        """
 
207
        self._check_closed()
 
208
        if length is None:
 
209
            return self._file_base.read_all()
 
210
        else:
 
211
            return self._file_base.read_n(length)
 
212
 
 
213
    def read_to(self, sequence, size=None):
 
214
        """
 
215
        Read characters until a sequence is found, with optional max size.
 
216
        The specified sequence, if found, will be included in the result
 
217
 
 
218
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
219
        >>> f.read_to(b'i')
 
220
        b'Th\\ni'
 
221
        >>> f.read_to(b'i')
 
222
        b's i'
 
223
        >>> f.close()
 
224
        >>> f.read_to(b'i')
 
225
        Traceback (most recent call last):
 
226
        ValueError: File is closed.
 
227
        """
 
228
        self._check_closed()
 
229
        return self._file_base.read_to(sequence, size)
 
230
 
 
231
    def readline(self, size=None):
 
232
        """
 
233
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
234
        >>> f.readline()
 
235
        b'Th\\n'
 
236
        >>> f.readline(4)
 
237
        b'is i'
 
238
        >>> f.close()
 
239
        >>> f.readline()
 
240
        Traceback (most recent call last):
 
241
        ValueError: File is closed.
 
242
        """
 
243
        return self.read_to(b'\n', size)
 
244
 
 
245
    def readlines(self, sizehint=None):
 
246
        """
 
247
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
248
        >>> f.readlines()
 
249
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
 
250
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
251
        >>> f.close()
 
252
        >>> f.readlines()
 
253
        Traceback (most recent call last):
 
254
        ValueError: File is closed.
 
255
        """
 
256
        lines = []
 
257
        while True:
 
258
            line = self.readline()
 
259
            if line == b"":
 
260
                return lines
 
261
            if sizehint is None:
 
262
                lines.append(line)
 
263
            elif len(line) < sizehint:
 
264
                lines.append(line)
 
265
                sizehint -= len(line)
 
266
            else:
 
267
                self._file_base.push_back(line)
 
268
                return lines
 
269
 
 
270
 
 
271
if __name__ == "__main__":
 
272
    import doctest
 
273
    doctest.testmod()