/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/iterablefile.py

  • Committer: Gustav Hartvigsson
  • Date: 2021-01-09 21:36:27 UTC
  • Revision ID: gustav.hartvigsson@gmail.com-20210109213627-h1xwcutzy9m7a99b
Added 'Case Preserving Working Tree Use Cases' from Canonical Wiki

* Addod a page from the Canonical Bazaar wiki
  with information on the scmeatics of case
  perserving filesystems an a case insensitive
  filesystem works.
  
  * Needs re-work, but this will do as it is the
    same inforamoton as what was on the linked
    page in the currint documentation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
 
2
# <aaron.bentley@utoronto.ca>
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
17
 
 
18
 
 
19
class IterableFileBase(object):
 
20
    """Create a file-like object from any iterable"""
 
21
 
 
22
    def __init__(self, iterable):
 
23
        object.__init__(self)
 
24
        self._iter = iterable.__iter__()
 
25
        self._buffer = b""
 
26
        self.done = False
 
27
 
 
28
    def read_n(self, length):
 
29
        """
 
30
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_n(8)
 
31
        b'This is '
 
32
        """
 
33
        def test_length(result):
 
34
            if len(result) >= length:
 
35
                return length
 
36
            else:
 
37
                return None
 
38
        return self._read(test_length)
 
39
 
 
40
    def read_to(self, sequence, length=None):
 
41
        """
 
42
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
43
        >>> f.read_to(b'\\n')
 
44
        b'Th\\n'
 
45
        >>> f.read_to(b'\\n')
 
46
        b'is is \\n'
 
47
        """
 
48
        def test_contents(result):
 
49
            if length is not None:
 
50
                if len(result) >= length:
 
51
                    return length
 
52
            try:
 
53
                return result.index(sequence) + len(sequence)
 
54
            except ValueError:
 
55
                return None
 
56
        return self._read(test_contents)
 
57
 
 
58
    def _read(self, result_length):
 
59
        """
 
60
        Read data until result satisfies the condition result_length.
 
61
        result_length is a callable that returns None until the condition
 
62
        is satisfied, and returns the length of the result to use when
 
63
        the condition is satisfied.  (i.e. it returns the length of the
 
64
        subset of the first condition match.)
 
65
        """
 
66
        result = self._buffer
 
67
        while result_length(result) is None:
 
68
            try:
 
69
                result += next(self._iter)
 
70
            except StopIteration:
 
71
                self.done = True
 
72
                self._buffer = b""
 
73
                return result
 
74
        output_length = result_length(result)
 
75
        self._buffer = result[output_length:]
 
76
        return result[:output_length]
 
77
 
 
78
    def read_all(self):
 
79
        """
 
80
        >>> IterableFileBase([b'This ', b'is ', b'a ', b'test.']).read_all()
 
81
        b'This is a test.'
 
82
        """
 
83
        def no_stop(result):
 
84
            return None
 
85
        return self._read(no_stop)
 
86
 
 
87
    def push_back(self, contents):
 
88
        """
 
89
        >>> f = IterableFileBase([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
90
        >>> f.read_to(b'\\n')
 
91
        b'Th\\n'
 
92
        >>> f.push_back(b"Sh")
 
93
        >>> f.read_all()
 
94
        b'Shis is \\na te\\nst.'
 
95
        """
 
96
        self._buffer = contents + self._buffer
 
97
 
 
98
 
 
99
class IterableFile(object):
 
100
    """This class supplies all File methods that can be implemented cheaply."""
 
101
 
 
102
    def __init__(self, iterable):
 
103
        object.__init__(self)
 
104
        self._file_base = IterableFileBase(iterable)
 
105
        self._iter = self._make_iterator()
 
106
        self._closed = False
 
107
        self.softspace = 0
 
108
 
 
109
    def _make_iterator(self):
 
110
        while not self._file_base.done:
 
111
            self._check_closed()
 
112
            result = self._file_base.read_to(b'\n')
 
113
            if result != b'':
 
114
                yield result
 
115
 
 
116
    def _check_closed(self):
 
117
        if self.closed:
 
118
            raise ValueError("File is closed.")
 
119
 
 
120
    def close(self):
 
121
        """
 
122
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
123
        >>> f.closed
 
124
        False
 
125
        >>> f.close()
 
126
        >>> f.closed
 
127
        True
 
128
        """
 
129
        self._file_base.done = True
 
130
        self._closed = True
 
131
 
 
132
    closed = property(lambda x: x._closed)
 
133
 
 
134
    def __enter__(self):
 
135
        return self
 
136
 
 
137
    def __exit__(self, exc_type, exc_val, exc_tb):
 
138
        # If there was an error raised, prefer the original one
 
139
        try:
 
140
            self.close()
 
141
        except BaseException:
 
142
            if exc_type is None:
 
143
                raise
 
144
        return False
 
145
 
 
146
    def flush(self):
 
147
        """No-op for standard compliance.
 
148
        >>> f = IterableFile([])
 
149
        >>> f.close()
 
150
        >>> f.flush()
 
151
        Traceback (most recent call last):
 
152
        ValueError: File is closed.
 
153
        """
 
154
        self._check_closed()
 
155
 
 
156
    def __next__(self):
 
157
        """Implementation of the iterator protocol's next()
 
158
 
 
159
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.'])
 
160
        >>> next(f)
 
161
        b'This \\n'
 
162
        >>> f.close()
 
163
        >>> next(f)
 
164
        Traceback (most recent call last):
 
165
        ValueError: File is closed.
 
166
        >>> f = IterableFile([b'This \\n', b'is ', b'a ', b'test.\\n'])
 
167
        >>> next(f)
 
168
        b'This \\n'
 
169
        >>> next(f)
 
170
        b'is a test.\\n'
 
171
        >>> next(f)
 
172
        Traceback (most recent call last):
 
173
        StopIteration
 
174
        """
 
175
        self._check_closed()
 
176
        return next(self._iter)
 
177
 
 
178
    next = __next__
 
179
 
 
180
    def __iter__(self):
 
181
        """
 
182
        >>> list(IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.']))
 
183
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
 
184
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
185
        >>> f.close()
 
186
        >>> list(f)
 
187
        Traceback (most recent call last):
 
188
        ValueError: File is closed.
 
189
        """
 
190
        return self
 
191
 
 
192
    def read(self, length=None):
 
193
        """
 
194
        >>> IterableFile([b'This ', b'is ', b'a ', b'test.']).read()
 
195
        b'This is a test.'
 
196
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
197
        >>> f.read(10)
 
198
        b'This is a '
 
199
        >>> f = IterableFile([b'This ', b'is ', b'a ', b'test.'])
 
200
        >>> f.close()
 
201
        >>> f.read(10)
 
202
        Traceback (most recent call last):
 
203
        ValueError: File is closed.
 
204
        """
 
205
        self._check_closed()
 
206
        if length is None:
 
207
            return self._file_base.read_all()
 
208
        else:
 
209
            return self._file_base.read_n(length)
 
210
 
 
211
    def read_to(self, sequence, size=None):
 
212
        """
 
213
        Read characters until a sequence is found, with optional max size.
 
214
        The specified sequence, if found, will be included in the result
 
215
 
 
216
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
217
        >>> f.read_to(b'i')
 
218
        b'Th\\ni'
 
219
        >>> f.read_to(b'i')
 
220
        b's i'
 
221
        >>> f.close()
 
222
        >>> f.read_to(b'i')
 
223
        Traceback (most recent call last):
 
224
        ValueError: File is closed.
 
225
        """
 
226
        self._check_closed()
 
227
        return self._file_base.read_to(sequence, size)
 
228
 
 
229
    def readline(self, size=None):
 
230
        """
 
231
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
232
        >>> f.readline()
 
233
        b'Th\\n'
 
234
        >>> f.readline(4)
 
235
        b'is i'
 
236
        >>> f.close()
 
237
        >>> f.readline()
 
238
        Traceback (most recent call last):
 
239
        ValueError: File is closed.
 
240
        """
 
241
        return self.read_to(b'\n', size)
 
242
 
 
243
    def readlines(self, sizehint=None):
 
244
        """
 
245
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
246
        >>> f.readlines()
 
247
        [b'Th\\n', b'is is \\n', b'a te\\n', b'st.']
 
248
        >>> f = IterableFile([b'Th\\nis ', b'is \\n', b'a ', b'te\\nst.'])
 
249
        >>> f.close()
 
250
        >>> f.readlines()
 
251
        Traceback (most recent call last):
 
252
        ValueError: File is closed.
 
253
        """
 
254
        lines = []
 
255
        while True:
 
256
            line = self.readline()
 
257
            if line == b"":
 
258
                return lines
 
259
            if sizehint is None:
 
260
                lines.append(line)
 
261
            elif len(line) < sizehint:
 
262
                lines.append(line)
 
263
                sizehint -= len(line)
 
264
            else:
 
265
                self._file_base.push_back(line)
 
266
                return lines
 
267
 
 
268
 
 
269
if __name__ == "__main__":
 
270
    import doctest
 
271
    doctest.testmod()