/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/iterablefile.py

  • Committer: Robert Collins
  • Date: 2007-04-19 02:27:44 UTC
  • mto: This revision was merged to the branch mainline in revision 2426.
  • Revision ID: robertc@robertcollins.net-20070419022744-pfdqz42kp1wizh43
``make docs`` now creates a man page at ``man1/bzr.1`` fixing bug 107388.
(Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
#
14
14
# You should have received a copy of the GNU General Public License
15
15
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
17
 
18
18
 
19
19
class IterableFileBase(object):
22
22
    def __init__(self, iterable):
23
23
        object.__init__(self)
24
24
        self._iter = iterable.__iter__()
25
 
        self._buffer = b""
 
25
        self._buffer = ""
26
26
        self.done = False
27
27
 
28
28
    def read_n(self, length):
29
29
        """
30
 
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
31
 
        b'This is '
 
30
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
31
        'This is '
32
32
        """
33
33
        def test_length(result):
34
34
            if len(result) >= length:
39
39
 
40
40
    def read_to(self, sequence, length=None):
41
41
        """
42
 
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
43
 
        >>> f.read_to(b'\\n')
44
 
        b'Th\\n'
45
 
        >>> f.read_to(b'\\n')
46
 
        b'is is \\n'
 
42
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
43
        >>> f.read_to('\\n')
 
44
        'Th\\n'
 
45
        >>> f.read_to('\\n')
 
46
        'is is \\n'
47
47
        """
48
48
        def test_contents(result):
49
49
            if length is not None:
50
50
                if len(result) >= length:
51
51
                    return length
52
52
            try:
53
 
                return result.index(sequence) + len(sequence)
 
53
                return result.index(sequence)+len(sequence)
54
54
            except ValueError:
55
55
                return None
56
56
        return self._read(test_contents)
66
66
        result = self._buffer
67
67
        while result_length(result) is None:
68
68
            try:
69
 
                result += next(self._iter)
 
69
                result += self._iter.next()
70
70
            except StopIteration:
71
71
                self.done = True
72
 
                self._buffer = b""
 
72
                self._buffer = ""
73
73
                return result
74
74
        output_length = result_length(result)
75
75
        self._buffer = result[output_length:]
77
77
 
78
78
    def read_all(self):
79
79
        """
80
 
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
81
 
        b'This is a test.'
 
80
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
81
        'This is a test.'
82
82
        """
83
83
        def no_stop(result):
84
84
            return None
85
85
        return self._read(no_stop)
86
86
 
 
87
 
87
88
    def push_back(self, contents):
88
89
        """
89
 
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
90
 
        >>> f.read_to(b'\\n')
91
 
        b'Th\\n'
92
 
        >>> f.push_back(b"Sh")
 
90
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
91
        >>> f.read_to('\\n')
 
92
        'Th\\n'
 
93
        >>> f.push_back("Sh")
93
94
        >>> f.read_all()
94
 
        b'Shis is \\na te\\nst.'
 
95
        'Shis is \\na te\\nst.'
95
96
        """
96
97
        self._buffer = contents + self._buffer
97
98
 
98
99
 
99
100
class IterableFile(object):
100
101
    """This class supplies all File methods that can be implemented cheaply."""
101
 
 
102
102
    def __init__(self, iterable):
103
103
        object.__init__(self)
104
104
        self._file_base = IterableFileBase(iterable)
109
109
    def _make_iterator(self):
110
110
        while not self._file_base.done:
111
111
            self._check_closed()
112
 
            result = self._file_base.read_to(b'\n')
113
 
            if result != b'':
 
112
            result = self._file_base.read_to('\n')
 
113
            if result != '':
114
114
                yield result
115
115
 
116
116
    def _check_closed(self):
119
119
 
120
120
    def close(self):
121
121
        """
122
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
122
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
123
123
        >>> f.closed
124
124
        False
125
125
        >>> f.close()
131
131
 
132
132
    closed = property(lambda x: x._closed)
133
133
 
134
 
    def __enter__(self):
135
 
        return self
136
 
 
137
 
    def __exit__(self, exc_type, exc_val, exc_tb):
138
 
        # If there was an error raised, prefer the original one
139
 
        try:
140
 
            self.close()
141
 
        except BaseException:
142
 
            if exc_type is None:
143
 
                raise
144
 
        return False
145
 
 
146
134
    def flush(self):
147
135
        """No-op for standard compliance.
148
136
        >>> f = IterableFile([])
153
141
        """
154
142
        self._check_closed()
155
143
 
156
 
    def __next__(self):
 
144
    def next(self):
157
145
        """Implementation of the iterator protocol's next()
158
146
 
159
 
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
160
 
        >>> next(f)
161
 
        b'This \\n'
 
147
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
148
        >>> f.next()
 
149
        'This \\n'
162
150
        >>> f.close()
163
 
        >>> next(f)
 
151
        >>> f.next()
164
152
        Traceback (most recent call last):
165
153
        ValueError: File is closed.
166
 
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
167
 
        >>> next(f)
168
 
        b'This \\n'
169
 
        >>> next(f)
170
 
        b'is a test.\\n'
171
 
        >>> next(f)
 
154
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
155
        >>> f.next()
 
156
        'This \\n'
 
157
        >>> f.next()
 
158
        'is a test.\\n'
 
159
        >>> f.next()
172
160
        Traceback (most recent call last):
173
161
        StopIteration
174
162
        """
175
163
        self._check_closed()
176
 
        return next(self._iter)
177
 
 
178
 
    next = __next__
 
164
        return self._iter.next()
179
165
 
180
166
    def __iter__(self):
181
167
        """
182
 
        >>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
183
 
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
184
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
168
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
169
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
170
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
185
171
        >>> f.close()
186
172
        >>> list(f)
187
173
        Traceback (most recent call last):
191
177
 
192
178
    def read(self, length=None):
193
179
        """
194
 
        >>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
195
 
        b'This is a test.'
196
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
180
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
181
        'This is a test.'
 
182
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
197
183
        >>> f.read(10)
198
 
        b'This is a '
199
 
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
184
        'This is a '
 
185
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
200
186
        >>> f.close()
201
187
        >>> f.read(10)
202
188
        Traceback (most recent call last):
213
199
        Read characters until a sequence is found, with optional max size.
214
200
        The specified sequence, if found, will be included in the result
215
201
 
216
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
217
 
        >>> f.read_to(b'i')
218
 
        b'Th\\ni'
219
 
        >>> f.read_to(b'i')
220
 
        b's i'
 
202
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
203
        >>> f.read_to('i')
 
204
        'Th\\ni'
 
205
        >>> f.read_to('i')
 
206
        's i'
221
207
        >>> f.close()
222
 
        >>> f.read_to(b'i')
 
208
        >>> f.read_to('i')
223
209
        Traceback (most recent call last):
224
210
        ValueError: File is closed.
225
211
        """
228
214
 
229
215
    def readline(self, size=None):
230
216
        """
231
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
217
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
232
218
        >>> f.readline()
233
 
        b'Th\\n'
 
219
        'Th\\n'
234
220
        >>> f.readline(4)
235
 
        b'is i'
 
221
        'is i'
236
222
        >>> f.close()
237
223
        >>> f.readline()
238
224
        Traceback (most recent call last):
239
225
        ValueError: File is closed.
240
226
        """
241
 
        return self.read_to(b'\n', size)
 
227
        return self.read_to('\n', size)
242
228
 
243
229
    def readlines(self, sizehint=None):
244
230
        """
245
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
231
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
246
232
        >>> f.readlines()
247
 
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
248
 
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
233
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
234
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
249
235
        >>> f.close()
250
236
        >>> f.readlines()
251
237
        Traceback (most recent call last):
254
240
        lines = []
255
241
        while True:
256
242
            line = self.readline()
257
 
            if line == b"":
 
243
            if line == "":
258
244
                return lines
259
245
            if sizehint is None:
260
246
                lines.append(line)
265
251
                self._file_base.push_back(line)
266
252
                return lines
267
253
 
268
 
 
 
254
        
269
255
if __name__ == "__main__":
270
256
    import doctest
271
257
    doctest.testmod()