/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_pack.py

Add validate method to ContainerReader and BytesRecordReader.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for bzrlib.pack."""
 
18
 
 
19
 
 
20
from cStringIO import StringIO
 
21
 
 
22
from bzrlib import pack, errors, tests
 
23
 
 
24
 
 
25
class TestContainerWriter(tests.TestCase):
 
26
 
 
27
    def test_construct(self):
 
28
        """Test constructing a ContainerWriter.
 
29
        
 
30
        This uses None as the output stream to show that the constructor doesn't
 
31
        try to use the output stream.
 
32
        """
 
33
        writer = pack.ContainerWriter(None)
 
34
 
 
35
    def test_begin(self):
 
36
        """The begin() method writes the container format marker line."""
 
37
        output = StringIO()
 
38
        writer = pack.ContainerWriter(output.write)
 
39
        writer.begin()
 
40
        self.assertEqual('Bazaar pack format 1\n', output.getvalue())
 
41
 
 
42
    def test_end(self):
 
43
        """The end() method writes an End Marker record."""
 
44
        output = StringIO()
 
45
        writer = pack.ContainerWriter(output.write)
 
46
        writer.begin()
 
47
        writer.end()
 
48
        self.assertEqual('Bazaar pack format 1\nE', output.getvalue())
 
49
 
 
50
    def test_add_bytes_record_no_name(self):
 
51
        """Add a bytes record with no name."""
 
52
        output = StringIO()
 
53
        writer = pack.ContainerWriter(output.write)
 
54
        writer.begin()
 
55
        writer.add_bytes_record('abc', names=[])
 
56
        self.assertEqual('Bazaar pack format 1\nB3\n\nabc', output.getvalue())
 
57
 
 
58
    def test_add_bytes_record_one_name(self):
 
59
        """Add a bytes record with one name."""
 
60
        output = StringIO()
 
61
        writer = pack.ContainerWriter(output.write)
 
62
        writer.begin()
 
63
        writer.add_bytes_record('abc', names=['name1'])
 
64
        self.assertEqual('Bazaar pack format 1\nB3\nname1\n\nabc',
 
65
                         output.getvalue())
 
66
 
 
67
    def test_add_bytes_record_two_names(self):
 
68
        """Add a bytes record with two names."""
 
69
        output = StringIO()
 
70
        writer = pack.ContainerWriter(output.write)
 
71
        writer.begin()
 
72
        writer.add_bytes_record('abc', names=['name1', 'name2'])
 
73
        self.assertEqual('Bazaar pack format 1\nB3\nname1\nname2\n\nabc',
 
74
                         output.getvalue())
 
75
 
 
76
    def test_add_bytes_record_invalid_name(self):
 
77
        """Adding a Bytes record with a name with whitespace in it raises
 
78
        InvalidRecordError.
 
79
        """
 
80
        output = StringIO()
 
81
        writer = pack.ContainerWriter(output.write)
 
82
        writer.begin()
 
83
        self.assertRaises(
 
84
            errors.InvalidRecordError,
 
85
            writer.add_bytes_record, 'abc', names=['bad name'])
 
86
 
 
87
 
 
88
class TestContainerReader(tests.TestCase):
 
89
 
 
90
    def get_reader_for(self, bytes):
 
91
        stream = StringIO(bytes)
 
92
        reader = pack.ContainerReader(stream.read)
 
93
        return reader
 
94
 
 
95
    def test_construct(self):
 
96
        """Test constructing a ContainerReader.
 
97
        
 
98
        This uses None as the output stream to show that the constructor doesn't
 
99
        try to use the input stream.
 
100
        """
 
101
        reader = pack.ContainerReader(None)
 
102
 
 
103
    def test_empty_container(self):
 
104
        """Read an empty container."""
 
105
        reader = self.get_reader_for("Bazaar pack format 1\nE")
 
106
        self.assertEqual([], list(reader.iter_records()))
 
107
 
 
108
    def test_unknown_format(self):
 
109
        """Unrecognised container formats raise UnknownContainerFormatError."""
 
110
        reader = self.get_reader_for("unknown format\n")
 
111
        self.assertRaises(
 
112
            errors.UnknownContainerFormatError, reader.iter_records)
 
113
 
 
114
    def test_unexpected_end_of_container(self):
 
115
        """Containers that don't end with an End Marker record should cause
 
116
        UnexpectedEndOfContainerError to be raised.
 
117
        """
 
118
        reader = self.get_reader_for("Bazaar pack format 1\n")
 
119
        iterator = reader.iter_records()
 
120
        self.assertRaises(
 
121
            errors.UnexpectedEndOfContainerError, iterator.next)
 
122
 
 
123
    def test_unknown_record_type(self):
 
124
        """Unknown record types cause UnknownRecordTypeError to be raised."""
 
125
        reader = self.get_reader_for("Bazaar pack format 1\nX")
 
126
        iterator = reader.iter_records()
 
127
        self.assertRaises(
 
128
            errors.UnknownRecordTypeError, iterator.next)
 
129
 
 
130
    def test_container_with_one_unnamed_record(self):
 
131
        """Read a container with one Bytes record.
 
132
        
 
133
        Parsing Bytes records is more thoroughly exercised by
 
134
        TestBytesRecordReader.  This test is here to ensure that
 
135
        ContainerReader's integration with BytesRecordReader is working.
 
136
        """
 
137
        reader = self.get_reader_for("Bazaar pack format 1\nB5\n\naaaaaE")
 
138
        expected_records = [([], 'aaaaa')]
 
139
        self.assertEqual(expected_records, list(reader.iter_records()))
 
140
 
 
141
    def test_validate_empty_container(self):
 
142
        reader = self.get_reader_for("Bazaar pack format 1\nE")
 
143
        # No exception raised
 
144
        reader.validate()
 
145
 
 
146
    def test_validate_non_empty_valid_container(self):
 
147
        reader = self.get_reader_for("Bazaar pack format 1\nB3\nname\n\nabcE")
 
148
        # No exception raised
 
149
        reader.validate()
 
150
 
 
151
    def test_validate_bad_format(self):
 
152
        inputs = ["", "x", "Bazaar pack format 1", "bad\n"]
 
153
        for input in inputs:
 
154
            reader = self.get_reader_for(input)
 
155
            self.assertRaises(
 
156
                (errors.UnexpectedEndOfContainerError,
 
157
                 errors.UnknownContainerFormatError),
 
158
                reader.validate)
 
159
 
 
160
    def test_validate_bad_record_marker(self):
 
161
        reader = self.get_reader_for("Bazaar pack format 1\nX")
 
162
        self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
 
163
 
 
164
    def test_validate_data_after_end_marker(self):
 
165
        reader = self.get_reader_for("Bazaar pack format 1\nEcrud")
 
166
        self.assertRaises(
 
167
            errors.ContainerHasExcessDataError, reader.validate)
 
168
 
 
169
    def test_validate_no_end_marker(self):
 
170
        reader = self.get_reader_for("Bazaar pack format 1\n")
 
171
        self.assertRaises(
 
172
            errors.UnexpectedEndOfContainerError, reader.validate)
 
173
 
 
174
 
 
175
class TestBytesRecordReader(tests.TestCase):
 
176
    """Tests for parsing Bytes records with BytesRecordReader."""
 
177
 
 
178
    def get_reader_for(self, bytes):
 
179
        stream = StringIO(bytes)
 
180
        reader = pack.BytesRecordReader(stream.read)
 
181
        return reader
 
182
 
 
183
    def test_record_with_no_name(self):
 
184
        """Reading a Bytes record with no name returns an empty list of
 
185
        names.
 
186
        """
 
187
        reader = self.get_reader_for("5\n\naaaaa")
 
188
        names, bytes = reader.read()
 
189
        self.assertEqual([], names)
 
190
        self.assertEqual('aaaaa', bytes)
 
191
 
 
192
    def test_record_with_one_name(self):
 
193
        """Reading a Bytes record with one name returns a list of just that
 
194
        name.
 
195
        """
 
196
        reader = self.get_reader_for("5\nname1\n\naaaaa")
 
197
        names, bytes = reader.read()
 
198
        self.assertEqual(['name1'], names)
 
199
        self.assertEqual('aaaaa', bytes)
 
200
 
 
201
    def test_record_with_two_names(self):
 
202
        """Reading a Bytes record with two names returns a list of both names.
 
203
        """
 
204
        reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
 
205
        names, bytes = reader.read()
 
206
        self.assertEqual(['name1', 'name2'], names)
 
207
        self.assertEqual('aaaaa', bytes)
 
208
 
 
209
    def test_invalid_length(self):
 
210
        """If the length-prefix is not a number, parsing raises
 
211
        InvalidRecordError.
 
212
        """
 
213
        reader = self.get_reader_for("not a number\n")
 
214
        self.assertRaises(errors.InvalidRecordError, reader.read)
 
215
 
 
216
    def test_early_eof(self):
 
217
        """Tests for premature EOF occuring during parsing Bytes records with
 
218
        BytesRecordReader.
 
219
        
 
220
        A incomplete container might be interrupted at any point.  The
 
221
        BytesRecordReader needs to cope with the input stream running out no
 
222
        matter where it is in the parsing process.
 
223
 
 
224
        In all cases, UnexpectedEndOfContainerError should be raised.
 
225
        """
 
226
        complete_record = "6\nname\n\nabcdef"
 
227
        for count in range(0, len(complete_record)):
 
228
            reader = self.get_reader_for(complete_record[:count])
 
229
            # We don't use assertRaises to make diagnosing failures easier.
 
230
            try:
 
231
                reader.read()
 
232
            except errors.UnexpectedEndOfContainerError:
 
233
                pass
 
234
            else:
 
235
                self.fail(
 
236
                    "UnexpectedEndOfContainerError not raised when parsing %r"
 
237
                    % (input.getvalue()))
 
238
 
 
239
    def test_initial_eof(self):
 
240
        """EOF before any bytes read at all."""
 
241
        reader = self.get_reader_for("")
 
242
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
 
243
 
 
244
    def test_eof_after_length(self):
 
245
        """EOF after reading the length and before reading name(s)."""
 
246
        reader = self.get_reader_for("123\n")
 
247
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
 
248
 
 
249
    def test_eof_during_name(self):
 
250
        """EOF during reading a name."""
 
251
        reader = self.get_reader_for("123\nname")
 
252
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
 
253
 
 
254
    def test_invalid_name_whitespace(self):
 
255
        """Names must have no whitespace."""
 
256
        # A name with a space.
 
257
        reader = self.get_reader_for("0\nbad name\n\n")
 
258
        self.assertRaises(errors.InvalidRecordError, reader.read)
 
259
 
 
260
        # A name with a tab.
 
261
        reader = self.get_reader_for("0\nbad\tname\n\n")
 
262
        self.assertRaises(errors.InvalidRecordError, reader.read)
 
263
 
 
264
        # A name with a vertical tab.
 
265
        reader = self.get_reader_for("0\nbad\vname\n\n")
 
266
        self.assertRaises(errors.InvalidRecordError, reader.read)
 
267
 
 
268
    def test_validate_whitespace_in_name(self):
 
269
        reader = self.get_reader_for("0\nbad name\n\nE")
 
270
        self.assertRaises(errors.InvalidRecordError, reader.validate)
 
271
 
 
272
    def test_validate_interrupted_prelude(self):
 
273
        reader = self.get_reader_for("")
 
274
        self.assertRaises(
 
275
            errors.UnexpectedEndOfContainerError, reader.validate)
 
276
 
 
277
    def test_validate_interrupted_body(self):
 
278
        reader = self.get_reader_for("1\n\n")
 
279
        self.assertRaises(
 
280
            errors.UnexpectedEndOfContainerError, reader.validate)
 
281
 
 
282
    def test_validate_unparseable_length(self):
 
283
        reader = self.get_reader_for("\n\n")
 
284
        self.assertRaises(
 
285
            errors.InvalidRecordError, reader.validate)
 
286