1
# Copyright (C) 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for bzrlib.pack."""
20
from cStringIO import StringIO
22
from bzrlib import pack, errors, tests
25
class TestContainerWriter(tests.TestCase):
27
def test_construct(self):
28
"""Test constructing a ContainerWriter.
30
This uses None as the output stream to show that the constructor doesn't
31
try to use the output stream.
33
writer = pack.ContainerWriter(None)
36
"""The begin() method writes the container format marker line."""
38
writer = pack.ContainerWriter(output.write)
40
self.assertEqual('Bazaar pack format 1\n', output.getvalue())
43
"""The end() method writes an End Marker record."""
45
writer = pack.ContainerWriter(output.write)
48
self.assertEqual('Bazaar pack format 1\nE', output.getvalue())
50
def test_add_bytes_record_no_name(self):
51
"""Add a bytes record with no name."""
53
writer = pack.ContainerWriter(output.write)
55
writer.add_bytes_record('abc', names=[])
56
self.assertEqual('Bazaar pack format 1\nB3\n\nabc', output.getvalue())
58
def test_add_bytes_record_one_name(self):
59
"""Add a bytes record with one name."""
61
writer = pack.ContainerWriter(output.write)
63
writer.add_bytes_record('abc', names=['name1'])
64
self.assertEqual('Bazaar pack format 1\nB3\nname1\n\nabc',
67
def test_add_bytes_record_two_names(self):
68
"""Add a bytes record with two names."""
70
writer = pack.ContainerWriter(output.write)
72
writer.add_bytes_record('abc', names=['name1', 'name2'])
73
self.assertEqual('Bazaar pack format 1\nB3\nname1\nname2\n\nabc',
76
def test_add_bytes_record_invalid_name(self):
77
"""Adding a Bytes record with a name with whitespace in it raises
81
writer = pack.ContainerWriter(output.write)
84
errors.InvalidRecordError,
85
writer.add_bytes_record, 'abc', names=['bad name'])
88
class TestContainerReader(tests.TestCase):
90
def test_construct(self):
91
"""Test constructing a ContainerReader.
93
This uses None as the output stream to show that the constructor doesn't
94
try to use the input stream.
96
reader = pack.ContainerReader(None)
98
def test_empty_container(self):
99
"""Read an empty container."""
100
input = StringIO("Bazaar pack format 1\nE")
101
reader = pack.ContainerReader(input.read)
102
self.assertEqual([], list(reader.iter_records()))
104
def test_unknown_format(self):
105
"""Unrecognised container formats raise UnknownContainerFormatError."""
106
input = StringIO("unknown format\n")
107
reader = pack.ContainerReader(input.read)
109
errors.UnknownContainerFormatError, reader.iter_records)
111
def test_unexpected_end_of_container(self):
112
"""Containers that don't end with an End Marker record should cause
113
UnexpectedEndOfContainerError to be raised.
115
input = StringIO("Bazaar pack format 1\n")
116
reader = pack.ContainerReader(input.read)
117
iterator = reader.iter_records()
119
errors.UnexpectedEndOfContainerError, iterator.next)
121
def test_unknown_record_type(self):
122
"""Unknown record types cause UnknownRecordTypeError to be raised."""
123
input = StringIO("Bazaar pack format 1\nX")
124
reader = pack.ContainerReader(input.read)
125
iterator = reader.iter_records()
127
errors.UnknownRecordTypeError, iterator.next)
129
def test_container_with_one_unnamed_record(self):
130
"""Read a container with one Bytes record.
132
Parsing Bytes records is more thoroughly exercised by
133
TestBytesRecordReader. This test is here to ensure that
134
ContainerReader's integration with BytesRecordReader is working.
136
input = StringIO("Bazaar pack format 1\nB5\n\naaaaaE")
137
reader = pack.ContainerReader(input.read)
138
expected_records = [([], 'aaaaa')]
139
self.assertEqual(expected_records, list(reader.iter_records()))
142
class TestBytesRecordReader(tests.TestCase):
143
"""Tests for parsing Bytes records with BytesRecordReader."""
145
def test_record_with_no_name(self):
146
"""Reading a Bytes record with no name returns an empty list of
149
input = StringIO("5\n\naaaaa")
150
reader = pack.BytesRecordReader(input.read)
151
names, bytes = reader.read()
152
self.assertEqual([], names)
153
self.assertEqual('aaaaa', bytes)
155
def test_record_with_one_name(self):
156
"""Reading a Bytes record with one name returns a list of just that
159
input = StringIO("5\nname1\n\naaaaa")
160
reader = pack.BytesRecordReader(input.read)
161
names, bytes = reader.read()
162
self.assertEqual(['name1'], names)
163
self.assertEqual('aaaaa', bytes)
165
def test_record_with_two_names(self):
166
"""Reading a Bytes record with two names returns a list of both names.
168
input = StringIO("5\nname1\nname2\n\naaaaa")
169
reader = pack.BytesRecordReader(input.read)
170
names, bytes = reader.read()
171
self.assertEqual(['name1', 'name2'], names)
172
self.assertEqual('aaaaa', bytes)
174
def test_invalid_length(self):
175
"""If the length-prefix is not a number, parsing raises
178
input = StringIO("not a number\n")
179
reader = pack.BytesRecordReader(input.read)
180
self.assertRaises(errors.InvalidRecordError, reader.read)
182
def test_early_eof(self):
183
"""Tests for premature EOF occuring during parsing Bytes records with
186
A incomplete container might be interrupted at any point. The
187
BytesRecordReader needs to cope with the input stream running out no
188
matter where it is in the parsing process.
190
In all cases, UnexpectedEndOfContainerError should be raised.
192
complete_record = "6\nname\n\nabcdef"
193
for count in range(0, len(complete_record)):
194
input = StringIO(complete_record[:count])
195
reader = pack.BytesRecordReader(input.read)
196
# We don't use assertRaises to make diagnosing failures easier.
199
except errors.UnexpectedEndOfContainerError:
203
"UnexpectedEndOfContainerError not raised when parsing %r"
204
% (input.getvalue()))
206
def test_initial_eof(self):
207
"""EOF before any bytes read at all."""
209
reader = pack.BytesRecordReader(input.read)
210
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
212
def test_eof_after_length(self):
213
"""EOF after reading the length and before reading name(s)."""
214
input = StringIO("123\n")
215
reader = pack.BytesRecordReader(input.read)
216
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
218
def test_eof_during_name(self):
219
"""EOF during reading a name."""
220
input = StringIO("123\nname")
221
reader = pack.BytesRecordReader(input.read)
222
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
224
def test_invalid_name_whitespace(self):
225
"""Names must have no whitespace."""
226
# A name with a space.
227
input = StringIO("0\nbad name\n\n")
228
reader = pack.BytesRecordReader(input.read)
229
self.assertRaises(errors.InvalidRecordError, reader.read)
232
input = StringIO("0\nbad\tname\n\n")
233
reader = pack.BytesRecordReader(input.read)
234
self.assertRaises(errors.InvalidRecordError, reader.read)
236
# A name with a vertical tab.
237
input = StringIO("0\nbad\vname\n\n")
238
reader = pack.BytesRecordReader(input.read)
239
self.assertRaises(errors.InvalidRecordError, reader.read)