/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
 
2
# <aaron.bentley@utoronto.ca>
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
17
 
 
18
from __future__ import absolute_import
 
19
 
 
20
class IterableFileBase(object):
 
21
    """Create a file-like object from any iterable"""
 
22
 
 
23
    def __init__(self, iterable):
 
24
        object.__init__(self)
 
25
        self._iter = iterable.__iter__()
 
26
        self._buffer = ""
 
27
        self.done = False
 
28
 
 
29
    def read_n(self, length):
 
30
        """
 
31
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
32
        'This is '
 
33
        """
 
34
        def test_length(result):
 
35
            if len(result) >= length:
 
36
                return length
 
37
            else:
 
38
                return None
 
39
        return self._read(test_length)
 
40
 
 
41
    def read_to(self, sequence, length=None):
 
42
        """
 
43
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
44
        >>> f.read_to('\\n')
 
45
        'Th\\n'
 
46
        >>> f.read_to('\\n')
 
47
        'is is \\n'
 
48
        """
 
49
        def test_contents(result):
 
50
            if length is not None:
 
51
                if len(result) >= length:
 
52
                    return length
 
53
            try:
 
54
                return result.index(sequence)+len(sequence)
 
55
            except ValueError:
 
56
                return None
 
57
        return self._read(test_contents)
 
58
 
 
59
    def _read(self, result_length):
 
60
        """
 
61
        Read data until result satisfies the condition result_length.
 
62
        result_length is a callable that returns None until the condition
 
63
        is satisfied, and returns the length of the result to use when
 
64
        the condition is satisfied.  (i.e. it returns the length of the
 
65
        subset of the first condition match.)
 
66
        """
 
67
        result = self._buffer
 
68
        while result_length(result) is None:
 
69
            try:
 
70
                result += next(self._iter)
 
71
            except StopIteration:
 
72
                self.done = True
 
73
                self._buffer = ""
 
74
                return result
 
75
        output_length = result_length(result)
 
76
        self._buffer = result[output_length:]
 
77
        return result[:output_length]
 
78
 
 
79
    def read_all(self):
 
80
        """
 
81
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
82
        'This is a test.'
 
83
        """
 
84
        def no_stop(result):
 
85
            return None
 
86
        return self._read(no_stop)
 
87
 
 
88
 
 
89
    def push_back(self, contents):
 
90
        """
 
91
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
92
        >>> f.read_to('\\n')
 
93
        'Th\\n'
 
94
        >>> f.push_back("Sh")
 
95
        >>> f.read_all()
 
96
        'Shis is \\na te\\nst.'
 
97
        """
 
98
        self._buffer = contents + self._buffer
 
99
 
 
100
 
 
101
class IterableFile(object):
 
102
    """This class supplies all File methods that can be implemented cheaply."""
 
103
    def __init__(self, iterable):
 
104
        object.__init__(self)
 
105
        self._file_base = IterableFileBase(iterable)
 
106
        self._iter = self._make_iterator()
 
107
        self._closed = False
 
108
        self.softspace = 0
 
109
 
 
110
    def _make_iterator(self):
 
111
        while not self._file_base.done:
 
112
            self._check_closed()
 
113
            result = self._file_base.read_to('\n')
 
114
            if result != '':
 
115
                yield result
 
116
 
 
117
    def _check_closed(self):
 
118
        if self.closed:
 
119
            raise ValueError("File is closed.")
 
120
 
 
121
    def close(self):
 
122
        """
 
123
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
124
        >>> f.closed
 
125
        False
 
126
        >>> f.close()
 
127
        >>> f.closed
 
128
        True
 
129
        """
 
130
        self._file_base.done = True
 
131
        self._closed = True
 
132
 
 
133
    closed = property(lambda x: x._closed)
 
134
 
 
135
    def flush(self):
 
136
        """No-op for standard compliance.
 
137
        >>> f = IterableFile([])
 
138
        >>> f.close()
 
139
        >>> f.flush()
 
140
        Traceback (most recent call last):
 
141
        ValueError: File is closed.
 
142
        """
 
143
        self._check_closed()
 
144
 
 
145
    def __next__(self):
 
146
        """Implementation of the iterator protocol's next()
 
147
 
 
148
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
149
        >>> next(f)
 
150
        'This \\n'
 
151
        >>> f.close()
 
152
        >>> next(f)
 
153
        Traceback (most recent call last):
 
154
        ValueError: File is closed.
 
155
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
156
        >>> next(f)
 
157
        'This \\n'
 
158
        >>> next(f)
 
159
        'is a test.\\n'
 
160
        >>> next(f)
 
161
        Traceback (most recent call last):
 
162
        StopIteration
 
163
        """
 
164
        self._check_closed()
 
165
        return next(self._iter)
 
166
 
 
167
    next = __next__
 
168
 
 
169
    def __iter__(self):
 
170
        """
 
171
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
172
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
173
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
174
        >>> f.close()
 
175
        >>> list(f)
 
176
        Traceback (most recent call last):
 
177
        ValueError: File is closed.
 
178
        """
 
179
        return self
 
180
 
 
181
    def read(self, length=None):
 
182
        """
 
183
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
184
        'This is a test.'
 
185
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
186
        >>> f.read(10)
 
187
        'This is a '
 
188
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
189
        >>> f.close()
 
190
        >>> f.read(10)
 
191
        Traceback (most recent call last):
 
192
        ValueError: File is closed.
 
193
        """
 
194
        self._check_closed()
 
195
        if length is None:
 
196
            return self._file_base.read_all()
 
197
        else:
 
198
            return self._file_base.read_n(length)
 
199
 
 
200
    def read_to(self, sequence, size=None):
 
201
        """
 
202
        Read characters until a sequence is found, with optional max size.
 
203
        The specified sequence, if found, will be included in the result
 
204
 
 
205
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
206
        >>> f.read_to('i')
 
207
        'Th\\ni'
 
208
        >>> f.read_to('i')
 
209
        's i'
 
210
        >>> f.close()
 
211
        >>> f.read_to('i')
 
212
        Traceback (most recent call last):
 
213
        ValueError: File is closed.
 
214
        """
 
215
        self._check_closed()
 
216
        return self._file_base.read_to(sequence, size)
 
217
 
 
218
    def readline(self, size=None):
 
219
        """
 
220
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
221
        >>> f.readline()
 
222
        'Th\\n'
 
223
        >>> f.readline(4)
 
224
        'is i'
 
225
        >>> f.close()
 
226
        >>> f.readline()
 
227
        Traceback (most recent call last):
 
228
        ValueError: File is closed.
 
229
        """
 
230
        return self.read_to('\n', size)
 
231
 
 
232
    def readlines(self, sizehint=None):
 
233
        """
 
234
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
235
        >>> f.readlines()
 
236
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
237
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
238
        >>> f.close()
 
239
        >>> f.readlines()
 
240
        Traceback (most recent call last):
 
241
        ValueError: File is closed.
 
242
        """
 
243
        lines = []
 
244
        while True:
 
245
            line = self.readline()
 
246
            if line == "":
 
247
                return lines
 
248
            if sizehint is None:
 
249
                lines.append(line)
 
250
            elif len(line) < sizehint:
 
251
                lines.append(line)
 
252
                sizehint -= len(line)
 
253
            else:
 
254
                self._file_base.push_back(line)
 
255
                return lines
 
256
 
 
257
 
 
258
if __name__ == "__main__":
 
259
    import doctest
 
260
    doctest.testmod()