/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Jelmer Vernooij
  • Date: 2018-05-07 15:27:39 UTC
  • mto: This revision was merged to the branch mainline in revision 6958.
  • Revision ID: jelmer@jelmer.uk-20180507152739-fuv9z9r0yzi7ln3t
Specify source in .coveragerc.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
 
2
# <aaron.bentley@utoronto.ca>
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
17
 
 
18
from __future__ import absolute_import
 
19
 
 
20
class IterableFileBase(object):
 
21
    """Create a file-like object from any iterable"""
 
22
 
 
23
    def __init__(self, iterable):
 
24
        object.__init__(self)
 
25
        self._iter = iterable.__iter__()
 
26
        self._buffer = b""
 
27
        self.done = False
 
28
 
 
29
    def read_n(self, length):
 
30
        """
 
31
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
32
        'This is '
 
33
        """
 
34
        def test_length(result):
 
35
            if len(result) >= length:
 
36
                return length
 
37
            else:
 
38
                return None
 
39
        return self._read(test_length)
 
40
 
 
41
    def read_to(self, sequence, length=None):
 
42
        """
 
43
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
44
        >>> f.read_to('\\n')
 
45
        'Th\\n'
 
46
        >>> f.read_to('\\n')
 
47
        'is is \\n'
 
48
        """
 
49
        def test_contents(result):
 
50
            if length is not None:
 
51
                if len(result) >= length:
 
52
                    return length
 
53
            try:
 
54
                return result.index(sequence)+len(sequence)
 
55
            except ValueError:
 
56
                return None
 
57
        return self._read(test_contents)
 
58
 
 
59
    def _read(self, result_length):
 
60
        """
 
61
        Read data until result satisfies the condition result_length.
 
62
        result_length is a callable that returns None until the condition
 
63
        is satisfied, and returns the length of the result to use when
 
64
        the condition is satisfied.  (i.e. it returns the length of the
 
65
        subset of the first condition match.)
 
66
        """
 
67
        result = self._buffer
 
68
        while result_length(result) is None:
 
69
            try:
 
70
                result += next(self._iter)
 
71
            except StopIteration:
 
72
                self.done = True
 
73
                self._buffer = b""
 
74
                return result
 
75
        output_length = result_length(result)
 
76
        self._buffer = result[output_length:]
 
77
        return result[:output_length]
 
78
 
 
79
    def read_all(self):
 
80
        """
 
81
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
82
        'This is a test.'
 
83
        """
 
84
        def no_stop(result):
 
85
            return None
 
86
        return self._read(no_stop)
 
87
 
 
88
 
 
89
    def push_back(self, contents):
 
90
        """
 
91
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
92
        >>> f.read_to('\\n')
 
93
        'Th\\n'
 
94
        >>> f.push_back("Sh")
 
95
        >>> f.read_all()
 
96
        'Shis is \\na te\\nst.'
 
97
        """
 
98
        self._buffer = contents + self._buffer
 
99
 
 
100
 
 
101
class IterableFile(object):
 
102
    """This class supplies all File methods that can be implemented cheaply."""
 
103
    def __init__(self, iterable):
 
104
        object.__init__(self)
 
105
        self._file_base = IterableFileBase(iterable)
 
106
        self._iter = self._make_iterator()
 
107
        self._closed = False
 
108
        self.softspace = 0
 
109
 
 
110
    def _make_iterator(self):
 
111
        while not self._file_base.done:
 
112
            self._check_closed()
 
113
            result = self._file_base.read_to(b'\n')
 
114
            if result != b'':
 
115
                yield result
 
116
 
 
117
    def _check_closed(self):
 
118
        if self.closed:
 
119
            raise ValueError("File is closed.")
 
120
 
 
121
    def close(self):
 
122
        """
 
123
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
124
        >>> f.closed
 
125
        False
 
126
        >>> f.close()
 
127
        >>> f.closed
 
128
        True
 
129
        """
 
130
        self._file_base.done = True
 
131
        self._closed = True
 
132
 
 
133
    closed = property(lambda x: x._closed)
 
134
 
 
135
    def __enter__(self):
 
136
        return self
 
137
 
 
138
    def __exit__(self, exc_type, exc_val, exc_tb):
 
139
        # If there was an error raised, prefer the original one
 
140
        try:
 
141
            self.close()
 
142
        except:
 
143
            if exc_type is None:
 
144
                raise
 
145
        return False
 
146
 
 
147
    def flush(self):
 
148
        """No-op for standard compliance.
 
149
        >>> f = IterableFile([])
 
150
        >>> f.close()
 
151
        >>> f.flush()
 
152
        Traceback (most recent call last):
 
153
        ValueError: File is closed.
 
154
        """
 
155
        self._check_closed()
 
156
 
 
157
    def __next__(self):
 
158
        """Implementation of the iterator protocol's next()
 
159
 
 
160
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
161
        >>> next(f)
 
162
        'This \\n'
 
163
        >>> f.close()
 
164
        >>> next(f)
 
165
        Traceback (most recent call last):
 
166
        ValueError: File is closed.
 
167
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
168
        >>> next(f)
 
169
        'This \\n'
 
170
        >>> next(f)
 
171
        'is a test.\\n'
 
172
        >>> next(f)
 
173
        Traceback (most recent call last):
 
174
        StopIteration
 
175
        """
 
176
        self._check_closed()
 
177
        return next(self._iter)
 
178
 
 
179
    next = __next__
 
180
 
 
181
    def __iter__(self):
 
182
        """
 
183
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
184
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
185
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
186
        >>> f.close()
 
187
        >>> list(f)
 
188
        Traceback (most recent call last):
 
189
        ValueError: File is closed.
 
190
        """
 
191
        return self
 
192
 
 
193
    def read(self, length=None):
 
194
        """
 
195
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
196
        'This is a test.'
 
197
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
198
        >>> f.read(10)
 
199
        'This is a '
 
200
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
201
        >>> f.close()
 
202
        >>> f.read(10)
 
203
        Traceback (most recent call last):
 
204
        ValueError: File is closed.
 
205
        """
 
206
        self._check_closed()
 
207
        if length is None:
 
208
            return self._file_base.read_all()
 
209
        else:
 
210
            return self._file_base.read_n(length)
 
211
 
 
212
    def read_to(self, sequence, size=None):
 
213
        """
 
214
        Read characters until a sequence is found, with optional max size.
 
215
        The specified sequence, if found, will be included in the result
 
216
 
 
217
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
218
        >>> f.read_to('i')
 
219
        'Th\\ni'
 
220
        >>> f.read_to('i')
 
221
        's i'
 
222
        >>> f.close()
 
223
        >>> f.read_to('i')
 
224
        Traceback (most recent call last):
 
225
        ValueError: File is closed.
 
226
        """
 
227
        self._check_closed()
 
228
        return self._file_base.read_to(sequence, size)
 
229
 
 
230
    def readline(self, size=None):
 
231
        """
 
232
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
233
        >>> f.readline()
 
234
        'Th\\n'
 
235
        >>> f.readline(4)
 
236
        'is i'
 
237
        >>> f.close()
 
238
        >>> f.readline()
 
239
        Traceback (most recent call last):
 
240
        ValueError: File is closed.
 
241
        """
 
242
        return self.read_to(b'\n', size)
 
243
 
 
244
    def readlines(self, sizehint=None):
 
245
        """
 
246
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
247
        >>> f.readlines()
 
248
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
249
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
250
        >>> f.close()
 
251
        >>> f.readlines()
 
252
        Traceback (most recent call last):
 
253
        ValueError: File is closed.
 
254
        """
 
255
        lines = []
 
256
        while True:
 
257
            line = self.readline()
 
258
            if line == b"":
 
259
                return lines
 
260
            if sizehint is None:
 
261
                lines.append(line)
 
262
            elif len(line) < sizehint:
 
263
                lines.append(line)
 
264
                sizehint -= len(line)
 
265
            else:
 
266
                self._file_base.push_back(line)
 
267
                return lines
 
268
 
 
269
 
 
270
if __name__ == "__main__":
 
271
    import doctest
 
272
    doctest.testmod()