/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/rio.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2018-11-16 18:26:22 UTC
  • mfrom: (7167.1.4 run-flake8)
  • Revision ID: breezy.the.bot@gmail.com-20181116182622-qw3gan3hz78a2imw
Add a flake8 test.

Merged from https://code.launchpad.net/~jelmer/brz/run-flake8/+merge/358902

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
from __future__ import absolute_import
 
18
 
 
19
# \subsection{\emph{rio} - simple text metaformat}
 
20
#
 
21
# \emph{r} stands for `restricted', `reproducible', or `rfc822-like'.
 
22
#
 
23
# The stored data consists of a series of \emph{stanzas}, each of which contains
 
24
# \emph{fields} identified by an ascii name, with Unicode or string contents.
 
25
# The field tag is constrained to alphanumeric characters.
 
26
# There may be more than one field in a stanza with the same name.
 
27
#
 
28
# The format itself does not deal with character encoding issues, though
 
29
# the result will normally be written in Unicode.
 
30
#
 
31
# The format is intended to be simple enough that there is exactly one character
 
32
# stream representation of an object and vice versa, and that this relation
 
33
# will continue to hold for future versions of bzr.
 
34
 
 
35
import re
 
36
 
 
37
from . import osutils
 
38
from .iterablefile import IterableFile
 
39
from .sixish import (
 
40
    text_type,
 
41
    )
 
42
 
 
43
# XXX: some redundancy is allowing to write stanzas in isolation as well as
 
44
# through a writer object.
 
45
 
 
46
class RioWriter(object):
 
47
 
 
48
    def __init__(self, to_file):
 
49
        self._soft_nl = False
 
50
        self._to_file = to_file
 
51
 
 
52
    def write_stanza(self, stanza):
 
53
        if self._soft_nl:
 
54
            self._to_file.write(b'\n')
 
55
        stanza.write(self._to_file)
 
56
        self._soft_nl = True
 
57
 
 
58
 
 
59
class RioReader(object):
 
60
    """Read stanzas from a file as a sequence
 
61
 
 
62
    to_file can be anything that can be enumerated as a sequence of
 
63
    lines (with newlines.)
 
64
    """
 
65
    def __init__(self, from_file):
 
66
        self._from_file = from_file
 
67
 
 
68
    def __iter__(self):
 
69
        while True:
 
70
            s = read_stanza(self._from_file)
 
71
            if s is None:
 
72
                break
 
73
            else:
 
74
                yield s
 
75
 
 
76
 
 
77
def rio_file(stanzas, header=None):
 
78
    """Produce a rio IterableFile from an iterable of stanzas"""
 
79
    def str_iter():
 
80
        if header is not None:
 
81
            yield header + b'\n'
 
82
        first_stanza = True
 
83
        for s in stanzas:
 
84
            if first_stanza is not True:
 
85
                yield b'\n'
 
86
            for line in s.to_lines():
 
87
                yield line
 
88
            first_stanza = False
 
89
    return IterableFile(str_iter())
 
90
 
 
91
 
 
92
def read_stanzas(from_file):
 
93
 
 
94
    while True:
 
95
        s = read_stanza(from_file)
 
96
        if s is None:
 
97
            break
 
98
        yield s
 
99
 
 
100
 
 
101
def read_stanzas_unicode(from_file):
 
102
 
 
103
    while True:
 
104
        s = read_stanza_unicode(from_file)
 
105
        if s is None:
 
106
            break
 
107
        yield s
 
108
 
 
109
 
 
110
class Stanza(object):
 
111
    """One stanza for rio.
 
112
 
 
113
    Each stanza contains a set of named fields.
 
114
 
 
115
    Names must be non-empty ascii alphanumeric plus _.  Names can be repeated
 
116
    within a stanza.  Names are case-sensitive.  The ordering of fields is
 
117
    preserved.
 
118
 
 
119
    Each field value must be either an int or a string.
 
120
    """
 
121
 
 
122
    __slots__ = ['items']
 
123
 
 
124
    def __init__(self, **kwargs):
 
125
        """Construct a new Stanza.
 
126
 
 
127
        The keyword arguments, if any, are added in sorted order to the stanza.
 
128
        """
 
129
        self.items = []
 
130
        if kwargs:
 
131
            for tag, value in sorted(kwargs.items()):
 
132
                self.add(tag, value)
 
133
 
 
134
    def add(self, tag, value):
 
135
        """Append a name and value to the stanza."""
 
136
        if not valid_tag(tag):
 
137
            raise ValueError("invalid tag %r" % (tag,))
 
138
        if isinstance(value, bytes):
 
139
            value = value.decode('ascii')
 
140
        elif isinstance(value, text_type):
 
141
            pass
 
142
        else:
 
143
            raise TypeError("invalid type for rio value: %r of type %s"
 
144
                            % (value, type(value)))
 
145
        self.items.append((tag, value))
 
146
 
 
147
    @classmethod
 
148
    def from_pairs(cls, pairs):
 
149
        ret = cls()
 
150
        ret.items = pairs
 
151
        return ret
 
152
 
 
153
    def __contains__(self, find_tag):
 
154
        """True if there is any field in this stanza with the given tag."""
 
155
        for tag, value in self.items:
 
156
            if tag == find_tag:
 
157
                return True
 
158
        return False
 
159
 
 
160
    def __len__(self):
 
161
        """Return number of pairs in the stanza."""
 
162
        return len(self.items)
 
163
 
 
164
    def __eq__(self, other):
 
165
        if not isinstance(other, Stanza):
 
166
            return False
 
167
        return self.items == other.items
 
168
 
 
169
    def __ne__(self, other):
 
170
        return not self.__eq__(other)
 
171
 
 
172
    def __repr__(self):
 
173
        return "Stanza(%r)" % self.items
 
174
 
 
175
    def iter_pairs(self):
 
176
        """Return iterator of tag, value pairs."""
 
177
        return iter(self.items)
 
178
 
 
179
    def to_lines(self):
 
180
        """Generate sequence of lines for external version of this file.
 
181
 
 
182
        The lines are always utf-8 encoded strings.
 
183
        """
 
184
        if not self.items:
 
185
            # max() complains if sequence is empty
 
186
            return []
 
187
        result = []
 
188
        for text_tag, text_value in self.items:
 
189
            tag = text_tag.encode('ascii')
 
190
            value = text_value.encode('utf-8')
 
191
            if value == b'':
 
192
                result.append(tag + b': \n')
 
193
            elif b'\n' in value:
 
194
                # don't want splitlines behaviour on empty lines
 
195
                val_lines = value.split(b'\n')
 
196
                result.append(tag + b': ' + val_lines[0] + b'\n')
 
197
                for line in val_lines[1:]:
 
198
                    result.append(b'\t' + line + b'\n')
 
199
            else:
 
200
                result.append(tag + b': ' + value + b'\n')
 
201
        return result
 
202
 
 
203
    def to_string(self):
 
204
        """Return stanza as a single string"""
 
205
        return b''.join(self.to_lines())
 
206
 
 
207
    def to_unicode(self):
 
208
        """Return stanza as a single Unicode string.
 
209
 
 
210
        This is most useful when adding a Stanza to a parent Stanza
 
211
        """
 
212
        if not self.items:
 
213
            return u''
 
214
 
 
215
        result = []
 
216
        for tag, value in self.items:
 
217
            if value == u'':
 
218
                result.append(tag + u': \n')
 
219
            elif u'\n' in value:
 
220
                # don't want splitlines behaviour on empty lines
 
221
                val_lines = value.split(u'\n')
 
222
                result.append(tag + u': ' + val_lines[0] + u'\n')
 
223
                for line in val_lines[1:]:
 
224
                    result.append(u'\t' + line + u'\n')
 
225
            else:
 
226
                result.append(tag + u': ' + value + u'\n')
 
227
        return u''.join(result)
 
228
 
 
229
    def write(self, to_file):
 
230
        """Write stanza to a file"""
 
231
        to_file.writelines(self.to_lines())
 
232
 
 
233
    def get(self, tag):
 
234
        """Return the value for a field wih given tag.
 
235
 
 
236
        If there is more than one value, only the first is returned.  If the
 
237
        tag is not present, KeyError is raised.
 
238
        """
 
239
        for t, v in self.items:
 
240
            if t == tag:
 
241
                return v
 
242
        else:
 
243
            raise KeyError(tag)
 
244
 
 
245
    __getitem__ = get
 
246
 
 
247
    def get_all(self, tag):
 
248
        r = []
 
249
        for t, v in self.items:
 
250
            if t == tag:
 
251
                r.append(v)
 
252
        return r
 
253
 
 
254
    def as_dict(self):
 
255
        """Return a dict containing the unique values of the stanza.
 
256
        """
 
257
        d = {}
 
258
        for tag, value in self.items:
 
259
            d[tag] = value
 
260
        return d
 
261
 
 
262
 
 
263
def valid_tag(tag):
 
264
    return _valid_tag(tag)
 
265
 
 
266
 
 
267
def read_stanza(line_iter):
 
268
    """Return new Stanza read from list of lines or a file
 
269
 
 
270
    Returns one Stanza that was read, or returns None at end of file.  If a
 
271
    blank line follows the stanza, it is consumed.  It's not an error for
 
272
    there to be no blank at end of file.  If there is a blank file at the
 
273
    start of the input this is really an empty stanza and that is returned.
 
274
 
 
275
    Only the stanza lines and the trailing blank (if any) are consumed
 
276
    from the line_iter.
 
277
 
 
278
    The raw lines must be in utf-8 encoding.
 
279
    """
 
280
    return _read_stanza_utf8(line_iter)
 
281
 
 
282
 
 
283
def read_stanza_unicode(unicode_iter):
 
284
    """Read a Stanza from a list of lines or a file.
 
285
 
 
286
    The lines should already be in unicode form. This returns a single
 
287
    stanza that was read. If there is a blank line at the end of the Stanza,
 
288
    it is consumed. It is not an error for there to be no blank line at
 
289
    the end of the iterable. If there is a blank line at the beginning,
 
290
    this is treated as an empty Stanza and None is returned.
 
291
 
 
292
    Only the stanza lines and the trailing blank (if any) are consumed
 
293
    from the unicode_iter
 
294
 
 
295
    :param unicode_iter: A iterable, yeilding Unicode strings. See read_stanza
 
296
        if you have a utf-8 encoded string.
 
297
    :return: A Stanza object if there are any lines in the file.
 
298
        None otherwise
 
299
    """
 
300
    return _read_stanza_unicode(unicode_iter)
 
301
 
 
302
 
 
303
def to_patch_lines(stanza, max_width=72):
 
304
    """Convert a stanza into RIO-Patch format lines.
 
305
 
 
306
    RIO-Patch is a RIO variant designed to be e-mailed as part of a patch.
 
307
    It resists common forms of damage such as newline conversion or the removal
 
308
    of trailing whitespace, yet is also reasonably easy to read.
 
309
 
 
310
    :param max_width: The maximum number of characters per physical line.
 
311
    :return: a list of lines
 
312
    """
 
313
    if max_width <= 6:
 
314
        raise ValueError(max_width)
 
315
    max_rio_width = max_width - 4
 
316
    lines = []
 
317
    for pline in stanza.to_lines():
 
318
        for line in pline.split(b'\n')[:-1]:
 
319
            line = re.sub(b'\\\\', b'\\\\\\\\', line)
 
320
            while len(line) > 0:
 
321
                partline = line[:max_rio_width]
 
322
                line = line[max_rio_width:]
 
323
                if len(line) > 0 and line[:1] != [b' ']:
 
324
                    break_index = -1
 
325
                    break_index = partline.rfind(b' ', -20)
 
326
                    if break_index < 3:
 
327
                        break_index = partline.rfind(b'-', -20)
 
328
                        break_index += 1
 
329
                    if break_index < 3:
 
330
                        break_index = partline.rfind(b'/', -20)
 
331
                    if break_index >= 3:
 
332
                        line = partline[break_index:] + line
 
333
                        partline = partline[:break_index]
 
334
                if len(line) > 0:
 
335
                    line = b'  ' + line
 
336
                partline = re.sub(b'\r', b'\\\\r', partline)
 
337
                blank_line = False
 
338
                if len(line) > 0:
 
339
                    partline += b'\\'
 
340
                elif re.search(b' $', partline):
 
341
                    partline += b'\\'
 
342
                    blank_line = True
 
343
                lines.append(b'# ' + partline + b'\n')
 
344
                if blank_line:
 
345
                    lines.append(b'#   \n')
 
346
    return lines
 
347
 
 
348
 
 
349
def _patch_stanza_iter(line_iter):
 
350
    map = {b'\\\\': b'\\',
 
351
           b'\\r' : b'\r',
 
352
           b'\\\n': b''}
 
353
    def mapget(match):
 
354
        return map[match.group(0)]
 
355
 
 
356
    last_line = None
 
357
    for line in line_iter:
 
358
        if line.startswith(b'# '):
 
359
            line = line[2:]
 
360
        elif line.startswith(b'#'):
 
361
            line = line[1:]
 
362
        else:
 
363
            raise ValueError("bad line %r" % (line,))
 
364
        if last_line is not None and len(line) > 2:
 
365
            line = line[2:]
 
366
        line = re.sub(b'\r', b'', line)
 
367
        line = re.sub(b'\\\\(.|\n)', mapget, line)
 
368
        if last_line is None:
 
369
            last_line = line
 
370
        else:
 
371
            last_line += line
 
372
        if last_line[-1:] == b'\n':
 
373
            yield last_line
 
374
            last_line = None
 
375
    if last_line is not None:
 
376
        yield last_line
 
377
 
 
378
 
 
379
def read_patch_stanza(line_iter):
 
380
    """Convert an iterable of RIO-Patch format lines into a Stanza.
 
381
 
 
382
    RIO-Patch is a RIO variant designed to be e-mailed as part of a patch.
 
383
    It resists common forms of damage such as newline conversion or the removal
 
384
    of trailing whitespace, yet is also reasonably easy to read.
 
385
 
 
386
    :return: a Stanza
 
387
    """
 
388
    return read_stanza(_patch_stanza_iter(line_iter))
 
389
 
 
390
 
 
391
try:
 
392
    from ._rio_pyx import (
 
393
        _read_stanza_utf8,
 
394
        _read_stanza_unicode,
 
395
        _valid_tag,
 
396
        )
 
397
except ImportError as e:
 
398
    osutils.failed_to_load_extension(e)
 
399
    from ._rio_py import (
 
400
       _read_stanza_utf8,
 
401
       _read_stanza_unicode,
 
402
       _valid_tag,
 
403
       )