1
# Copyright (C) 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for bzrlib.pack."""
20
from cStringIO import StringIO
22
from bzrlib import pack, errors, tests
25
class TestContainerWriter(tests.TestCase):
27
def test_construct(self):
28
"""Test constructing a ContainerWriter.
30
This uses None as the output stream to show that the constructor doesn't
31
try to use the output stream.
33
writer = pack.ContainerWriter(None)
36
"""The begin() method writes the container format marker line."""
38
writer = pack.ContainerWriter(output.write)
40
self.assertEqual('Bazaar pack format 1\n', output.getvalue())
43
"""The end() method writes an End Marker record."""
45
writer = pack.ContainerWriter(output.write)
48
self.assertEqual('Bazaar pack format 1\nE', output.getvalue())
50
def test_add_bytes_record_no_name(self):
51
"""Add a bytes record with no name."""
53
writer = pack.ContainerWriter(output.write)
55
writer.add_bytes_record('abc', names=[])
56
self.assertEqual('Bazaar pack format 1\nB3\n\nabc', output.getvalue())
58
def test_add_bytes_record_one_name(self):
59
"""Add a bytes record with one name."""
61
writer = pack.ContainerWriter(output.write)
63
writer.add_bytes_record('abc', names=['name1'])
64
self.assertEqual('Bazaar pack format 1\nB3\nname1\n\nabc',
67
def test_add_bytes_record_two_names(self):
68
"""Add a bytes record with two names."""
70
writer = pack.ContainerWriter(output.write)
72
writer.add_bytes_record('abc', names=['name1', 'name2'])
73
self.assertEqual('Bazaar pack format 1\nB3\nname1\nname2\n\nabc',
76
def test_add_bytes_record_invalid_name(self):
77
"""Adding a Bytes record with a name with whitespace in it raises
81
writer = pack.ContainerWriter(output.write)
84
errors.InvalidRecordError,
85
writer.add_bytes_record, 'abc', names=['bad name'])
88
class TestContainerReader(tests.TestCase):
90
def get_reader_for(self, bytes):
91
stream = StringIO(bytes)
92
reader = pack.ContainerReader(stream.read)
95
def test_construct(self):
96
"""Test constructing a ContainerReader.
98
This uses None as the output stream to show that the constructor doesn't
99
try to use the input stream.
101
reader = pack.ContainerReader(None)
103
def test_empty_container(self):
104
"""Read an empty container."""
105
reader = self.get_reader_for("Bazaar pack format 1\nE")
106
self.assertEqual([], list(reader.iter_records()))
108
def test_unknown_format(self):
109
"""Unrecognised container formats raise UnknownContainerFormatError."""
110
reader = self.get_reader_for("unknown format\n")
112
errors.UnknownContainerFormatError, reader.iter_records)
114
def test_unexpected_end_of_container(self):
115
"""Containers that don't end with an End Marker record should cause
116
UnexpectedEndOfContainerError to be raised.
118
reader = self.get_reader_for("Bazaar pack format 1\n")
119
iterator = reader.iter_records()
121
errors.UnexpectedEndOfContainerError, iterator.next)
123
def test_unknown_record_type(self):
124
"""Unknown record types cause UnknownRecordTypeError to be raised."""
125
reader = self.get_reader_for("Bazaar pack format 1\nX")
126
iterator = reader.iter_records()
128
errors.UnknownRecordTypeError, iterator.next)
130
def test_container_with_one_unnamed_record(self):
131
"""Read a container with one Bytes record.
133
Parsing Bytes records is more thoroughly exercised by
134
TestBytesRecordReader. This test is here to ensure that
135
ContainerReader's integration with BytesRecordReader is working.
137
reader = self.get_reader_for("Bazaar pack format 1\nB5\n\naaaaaE")
138
expected_records = [([], 'aaaaa')]
139
self.assertEqual(expected_records, list(reader.iter_records()))
141
def test_validate_empty_container(self):
142
reader = self.get_reader_for("Bazaar pack format 1\nE")
143
# No exception raised
146
def test_validate_non_empty_valid_container(self):
147
reader = self.get_reader_for("Bazaar pack format 1\nB3\nname\n\nabcE")
148
# No exception raised
151
def test_validate_bad_format(self):
152
inputs = ["", "x", "Bazaar pack format 1", "bad\n"]
154
reader = self.get_reader_for(input)
156
(errors.UnexpectedEndOfContainerError,
157
errors.UnknownContainerFormatError),
160
def test_validate_bad_record_marker(self):
161
reader = self.get_reader_for("Bazaar pack format 1\nX")
162
self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
164
def test_validate_data_after_end_marker(self):
165
reader = self.get_reader_for("Bazaar pack format 1\nEcrud")
167
errors.ContainerHasExcessDataError, reader.validate)
169
def test_validate_no_end_marker(self):
170
reader = self.get_reader_for("Bazaar pack format 1\n")
172
errors.UnexpectedEndOfContainerError, reader.validate)
175
class TestBytesRecordReader(tests.TestCase):
176
"""Tests for parsing Bytes records with BytesRecordReader."""
178
def get_reader_for(self, bytes):
179
stream = StringIO(bytes)
180
reader = pack.BytesRecordReader(stream.read)
183
def test_record_with_no_name(self):
184
"""Reading a Bytes record with no name returns an empty list of
187
reader = self.get_reader_for("5\n\naaaaa")
188
names, bytes = reader.read()
189
self.assertEqual([], names)
190
self.assertEqual('aaaaa', bytes)
192
def test_record_with_one_name(self):
193
"""Reading a Bytes record with one name returns a list of just that
196
reader = self.get_reader_for("5\nname1\n\naaaaa")
197
names, bytes = reader.read()
198
self.assertEqual(['name1'], names)
199
self.assertEqual('aaaaa', bytes)
201
def test_record_with_two_names(self):
202
"""Reading a Bytes record with two names returns a list of both names.
204
reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
205
names, bytes = reader.read()
206
self.assertEqual(['name1', 'name2'], names)
207
self.assertEqual('aaaaa', bytes)
209
def test_invalid_length(self):
210
"""If the length-prefix is not a number, parsing raises
213
reader = self.get_reader_for("not a number\n")
214
self.assertRaises(errors.InvalidRecordError, reader.read)
216
def test_early_eof(self):
217
"""Tests for premature EOF occuring during parsing Bytes records with
220
A incomplete container might be interrupted at any point. The
221
BytesRecordReader needs to cope with the input stream running out no
222
matter where it is in the parsing process.
224
In all cases, UnexpectedEndOfContainerError should be raised.
226
complete_record = "6\nname\n\nabcdef"
227
for count in range(0, len(complete_record)):
228
reader = self.get_reader_for(complete_record[:count])
229
# We don't use assertRaises to make diagnosing failures easier.
232
except errors.UnexpectedEndOfContainerError:
236
"UnexpectedEndOfContainerError not raised when parsing %r"
237
% (input.getvalue()))
239
def test_initial_eof(self):
240
"""EOF before any bytes read at all."""
241
reader = self.get_reader_for("")
242
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
244
def test_eof_after_length(self):
245
"""EOF after reading the length and before reading name(s)."""
246
reader = self.get_reader_for("123\n")
247
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
249
def test_eof_during_name(self):
250
"""EOF during reading a name."""
251
reader = self.get_reader_for("123\nname")
252
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
254
def test_invalid_name_whitespace(self):
255
"""Names must have no whitespace."""
256
# A name with a space.
257
reader = self.get_reader_for("0\nbad name\n\n")
258
self.assertRaises(errors.InvalidRecordError, reader.read)
261
reader = self.get_reader_for("0\nbad\tname\n\n")
262
self.assertRaises(errors.InvalidRecordError, reader.read)
264
# A name with a vertical tab.
265
reader = self.get_reader_for("0\nbad\vname\n\n")
266
self.assertRaises(errors.InvalidRecordError, reader.read)
268
def test_validate_whitespace_in_name(self):
269
reader = self.get_reader_for("0\nbad name\n\nE")
270
self.assertRaises(errors.InvalidRecordError, reader.validate)
272
def test_validate_interrupted_prelude(self):
273
reader = self.get_reader_for("")
275
errors.UnexpectedEndOfContainerError, reader.validate)
277
def test_validate_interrupted_body(self):
278
reader = self.get_reader_for("1\n\n")
280
errors.UnexpectedEndOfContainerError, reader.validate)
282
def test_validate_unparseable_length(self):
283
reader = self.get_reader_for("\n\n")
285
errors.InvalidRecordError, reader.validate)