1
# Copyright (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
from __future__ import absolute_import
19
# \subsection{\emph{rio} - simple text metaformat}
21
# \emph{r} stands for `restricted', `reproducible', or `rfc822-like'.
23
# The stored data consists of a series of \emph{stanzas}, each of which contains
24
# \emph{fields} identified by an ascii name, with Unicode or string contents.
25
# The field tag is constrained to alphanumeric characters.
26
# There may be more than one field in a stanza with the same name.
28
# The format itself does not deal with character encoding issues, though
29
# the result will normally be written in Unicode.
31
# The format is intended to be simple enough that there is exactly one character
32
# stream representation of an object and vice versa, and that this relation
33
# will continue to hold for future versions of bzr.
38
from .iterablefile import IterableFile
43
# XXX: some redundancy is allowing to write stanzas in isolation as well as
44
# through a writer object.
47
class RioWriter(object):
49
def __init__(self, to_file):
51
self._to_file = to_file
53
def write_stanza(self, stanza):
55
self._to_file.write(b'\n')
56
stanza.write(self._to_file)
60
class RioReader(object):
61
"""Read stanzas from a file as a sequence
63
to_file can be anything that can be enumerated as a sequence of
64
lines (with newlines.)
67
def __init__(self, from_file):
68
self._from_file = from_file
72
s = read_stanza(self._from_file)
79
def rio_file(stanzas, header=None):
80
"""Produce a rio IterableFile from an iterable of stanzas"""
82
if header is not None:
86
if first_stanza is not True:
88
for line in s.to_lines():
91
return IterableFile(str_iter())
94
def read_stanzas(from_file):
97
s = read_stanza(from_file)
103
def read_stanzas_unicode(from_file):
106
s = read_stanza_unicode(from_file)
112
class Stanza(object):
113
"""One stanza for rio.
115
Each stanza contains a set of named fields.
117
Names must be non-empty ascii alphanumeric plus _. Names can be repeated
118
within a stanza. Names are case-sensitive. The ordering of fields is
121
Each field value must be either an int or a string.
124
__slots__ = ['items']
126
def __init__(self, **kwargs):
127
"""Construct a new Stanza.
129
The keyword arguments, if any, are added in sorted order to the stanza.
133
for tag, value in sorted(kwargs.items()):
136
def add(self, tag, value):
137
"""Append a name and value to the stanza."""
138
if not valid_tag(tag):
139
raise ValueError("invalid tag %r" % (tag,))
140
if isinstance(value, bytes):
141
value = value.decode('ascii')
142
elif isinstance(value, text_type):
145
raise TypeError("invalid type for rio value: %r of type %s"
146
% (value, type(value)))
147
self.items.append((tag, value))
150
def from_pairs(cls, pairs):
155
def __contains__(self, find_tag):
156
"""True if there is any field in this stanza with the given tag."""
157
for tag, value in self.items:
163
"""Return number of pairs in the stanza."""
164
return len(self.items)
166
def __eq__(self, other):
167
if not isinstance(other, Stanza):
169
return self.items == other.items
171
def __ne__(self, other):
172
return not self.__eq__(other)
175
return "Stanza(%r)" % self.items
177
def iter_pairs(self):
178
"""Return iterator of tag, value pairs."""
179
return iter(self.items)
182
"""Generate sequence of lines for external version of this file.
184
The lines are always utf-8 encoded strings.
187
# max() complains if sequence is empty
190
for text_tag, text_value in self.items:
191
tag = text_tag.encode('ascii')
192
value = text_value.encode('utf-8')
194
result.append(tag + b': \n')
196
# don't want splitlines behaviour on empty lines
197
val_lines = value.split(b'\n')
198
result.append(tag + b': ' + val_lines[0] + b'\n')
199
for line in val_lines[1:]:
200
result.append(b'\t' + line + b'\n')
202
result.append(tag + b': ' + value + b'\n')
206
"""Return stanza as a single string"""
207
return b''.join(self.to_lines())
209
def to_unicode(self):
210
"""Return stanza as a single Unicode string.
212
This is most useful when adding a Stanza to a parent Stanza
218
for tag, value in self.items:
220
result.append(tag + u': \n')
222
# don't want splitlines behaviour on empty lines
223
val_lines = value.split(u'\n')
224
result.append(tag + u': ' + val_lines[0] + u'\n')
225
for line in val_lines[1:]:
226
result.append(u'\t' + line + u'\n')
228
result.append(tag + u': ' + value + u'\n')
229
return u''.join(result)
231
def write(self, to_file):
232
"""Write stanza to a file"""
233
to_file.writelines(self.to_lines())
236
"""Return the value for a field wih given tag.
238
If there is more than one value, only the first is returned. If the
239
tag is not present, KeyError is raised.
241
for t, v in self.items:
249
def get_all(self, tag):
251
for t, v in self.items:
257
"""Return a dict containing the unique values of the stanza.
260
for tag, value in self.items:
266
return _valid_tag(tag)
269
def read_stanza(line_iter):
270
"""Return new Stanza read from list of lines or a file
272
Returns one Stanza that was read, or returns None at end of file. If a
273
blank line follows the stanza, it is consumed. It's not an error for
274
there to be no blank at end of file. If there is a blank file at the
275
start of the input this is really an empty stanza and that is returned.
277
Only the stanza lines and the trailing blank (if any) are consumed
280
The raw lines must be in utf-8 encoding.
282
return _read_stanza_utf8(line_iter)
285
def read_stanza_unicode(unicode_iter):
286
"""Read a Stanza from a list of lines or a file.
288
The lines should already be in unicode form. This returns a single
289
stanza that was read. If there is a blank line at the end of the Stanza,
290
it is consumed. It is not an error for there to be no blank line at
291
the end of the iterable. If there is a blank line at the beginning,
292
this is treated as an empty Stanza and None is returned.
294
Only the stanza lines and the trailing blank (if any) are consumed
295
from the unicode_iter
297
:param unicode_iter: A iterable, yeilding Unicode strings. See read_stanza
298
if you have a utf-8 encoded string.
299
:return: A Stanza object if there are any lines in the file.
302
return _read_stanza_unicode(unicode_iter)
305
def to_patch_lines(stanza, max_width=72):
306
"""Convert a stanza into RIO-Patch format lines.
308
RIO-Patch is a RIO variant designed to be e-mailed as part of a patch.
309
It resists common forms of damage such as newline conversion or the removal
310
of trailing whitespace, yet is also reasonably easy to read.
312
:param max_width: The maximum number of characters per physical line.
313
:return: a list of lines
316
raise ValueError(max_width)
317
max_rio_width = max_width - 4
319
for pline in stanza.to_lines():
320
for line in pline.split(b'\n')[:-1]:
321
line = re.sub(b'\\\\', b'\\\\\\\\', line)
323
partline = line[:max_rio_width]
324
line = line[max_rio_width:]
325
if len(line) > 0 and line[:1] != [b' ']:
327
break_index = partline.rfind(b' ', -20)
329
break_index = partline.rfind(b'-', -20)
332
break_index = partline.rfind(b'/', -20)
334
line = partline[break_index:] + line
335
partline = partline[:break_index]
338
partline = re.sub(b'\r', b'\\\\r', partline)
342
elif re.search(b' $', partline):
345
lines.append(b'# ' + partline + b'\n')
347
lines.append(b'# \n')
351
def _patch_stanza_iter(line_iter):
352
map = {b'\\\\': b'\\',
357
return map[match.group(0)]
360
for line in line_iter:
361
if line.startswith(b'# '):
363
elif line.startswith(b'#'):
366
raise ValueError("bad line %r" % (line,))
367
if last_line is not None and len(line) > 2:
369
line = re.sub(b'\r', b'', line)
370
line = re.sub(b'\\\\(.|\n)', mapget, line)
371
if last_line is None:
375
if last_line[-1:] == b'\n':
378
if last_line is not None:
382
def read_patch_stanza(line_iter):
383
"""Convert an iterable of RIO-Patch format lines into a Stanza.
385
RIO-Patch is a RIO variant designed to be e-mailed as part of a patch.
386
It resists common forms of damage such as newline conversion or the removal
387
of trailing whitespace, yet is also reasonably easy to read.
391
return read_stanza(_patch_stanza_iter(line_iter))
395
from ._rio_pyx import (
397
_read_stanza_unicode,
400
except ImportError as e:
401
osutils.failed_to_load_extension(e)
402
from ._rio_py import (
404
_read_stanza_unicode,