22
22
from bzrlib import pack, errors, tests
25
class TestContainerSerialiser(tests.TestCase):
26
"""Tests for the ContainerSerialiser class."""
28
def test_construct(self):
29
"""Test constructing a ContainerSerialiser."""
30
pack.ContainerSerialiser()
33
serialiser = pack.ContainerSerialiser()
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
serialiser = pack.ContainerSerialiser()
39
self.assertEqual('E', serialiser.end())
41
def test_bytes_record_no_name(self):
42
serialiser = pack.ContainerSerialiser()
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
46
def test_bytes_record_one_name_with_one_part(self):
47
serialiser = pack.ContainerSerialiser()
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
51
def test_bytes_record_one_name_with_two_parts(self):
52
serialiser = pack.ContainerSerialiser()
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
56
def test_bytes_record_two_names(self):
57
serialiser = pack.ContainerSerialiser()
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
61
def test_bytes_record_whitespace_in_name_part(self):
62
serialiser = pack.ContainerSerialiser()
64
errors.InvalidRecordError,
65
serialiser.bytes_record, 'bytes', [('bad name',)])
25
68
class TestContainerWriter(tests.TestCase):
71
tests.TestCase.setUp(self)
72
self.output = StringIO()
73
self.writer = pack.ContainerWriter(self.output.write)
75
def assertOutput(self, expected_output):
76
"""Assert that the output of self.writer ContainerWriter is equal to
79
self.assertEqual(expected_output, self.output.getvalue())
27
81
def test_construct(self):
28
82
"""Test constructing a ContainerWriter.
30
This uses None as the output stream to show that the constructor doesn't
31
try to use the output stream.
84
This uses None as the output stream to show that the constructor
85
doesn't try to use the output stream.
33
87
writer = pack.ContainerWriter(None)
35
89
def test_begin(self):
36
90
"""The begin() method writes the container format marker line."""
38
writer = pack.ContainerWriter(output.write)
40
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
92
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
94
def test_zero_records_written_after_begin(self):
95
"""After begin is written, 0 records have been written."""
97
self.assertEqual(0, self.writer.records_written)
43
99
def test_end(self):
44
100
"""The end() method writes an End Marker record."""
46
writer = pack.ContainerWriter(output.write)
49
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nE',
103
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
105
def test_empty_end_does_not_add_a_record_to_records_written(self):
106
"""The end() method does not count towards the records written."""
109
self.assertEqual(0, self.writer.records_written)
111
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
112
"""The end() method does not count towards the records written."""
114
self.writer.add_bytes_record('foo', names=[])
116
self.assertEqual(1, self.writer.records_written)
52
118
def test_add_bytes_record_no_name(self):
53
119
"""Add a bytes record with no name."""
55
writer = pack.ContainerWriter(output.write)
57
offset, length = writer.add_bytes_record('abc', names=[])
121
offset, length = self.writer.add_bytes_record('abc', names=[])
58
122
self.assertEqual((42, 7), (offset, length))
59
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc',
124
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
62
126
def test_add_bytes_record_one_name(self):
63
127
"""Add a bytes record with one name."""
65
writer = pack.ContainerWriter(output.write)
67
offset, length = writer.add_bytes_record('abc', names=[('name1', )])
129
offset, length = self.writer.add_bytes_record(
130
'abc', names=[('name1', )])
68
131
self.assertEqual((42, 13), (offset, length))
70
'Bazaar pack format 1 (introduced in 0.18)\n'
74
def test_add_bytes_record_two_names(self):
75
"""Add a bytes record with two names."""
77
writer = pack.ContainerWriter(output.write)
79
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
80
self.assertEqual((42, 19), (offset, length))
82
'Bazaar pack format 1 (introduced in 0.18)\n'
83
'B3\nname1\nname2\n\nabc',
86
def test_add_bytes_record_two_names(self):
87
"""Add a bytes record with two names."""
89
writer = pack.ContainerWriter(output.write)
91
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
92
self.assertEqual((42, 19), (offset, length))
94
'Bazaar pack format 1 (introduced in 0.18)\n'
95
'B3\nname1\nname2\n\nabc',
133
'Bazaar pack format 1 (introduced in 0.18)\n'
136
def test_add_bytes_record_two_names(self):
137
"""Add a bytes record with two names."""
139
offset, length = self.writer.add_bytes_record(
140
'abc', names=[('name1', ), ('name2', )])
141
self.assertEqual((42, 19), (offset, length))
143
'Bazaar pack format 1 (introduced in 0.18)\n'
144
'B3\nname1\nname2\n\nabc')
146
def test_add_bytes_record_two_names(self):
147
"""Add a bytes record with two names."""
149
offset, length = self.writer.add_bytes_record(
150
'abc', names=[('name1', ), ('name2', )])
151
self.assertEqual((42, 19), (offset, length))
153
'Bazaar pack format 1 (introduced in 0.18)\n'
154
'B3\nname1\nname2\n\nabc')
98
156
def test_add_bytes_record_two_element_name(self):
99
157
"""Add a bytes record with a two-element name."""
101
writer = pack.ContainerWriter(output.write)
103
offset, length = writer.add_bytes_record('abc', names=[('name1', 'name2')])
159
offset, length = self.writer.add_bytes_record(
160
'abc', names=[('name1', 'name2')])
104
161
self.assertEqual((42, 19), (offset, length))
106
163
'Bazaar pack format 1 (introduced in 0.18)\n'
107
'B3\nname1\x00name2\n\nabc',
164
'B3\nname1\x00name2\n\nabc')
110
166
def test_add_second_bytes_record_gets_higher_offset(self):
112
writer = pack.ContainerWriter(output.write)
114
writer.add_bytes_record('abc', names=[])
115
offset, length = writer.add_bytes_record('abc', names=[])
168
self.writer.add_bytes_record('abc', names=[])
169
offset, length = self.writer.add_bytes_record('abc', names=[])
116
170
self.assertEqual((49, 7), (offset, length))
118
172
'Bazaar pack format 1 (introduced in 0.18)\n'
123
176
def test_add_bytes_record_invalid_name(self):
124
177
"""Adding a Bytes record with a name with whitespace in it raises
125
178
InvalidRecordError.
128
writer = pack.ContainerWriter(output.write)
130
181
self.assertRaises(
131
182
errors.InvalidRecordError,
132
writer.add_bytes_record, 'abc', names=[('bad name', )])
183
self.writer.add_bytes_record, 'abc', names=[('bad name', )])
185
def test_add_bytes_records_add_to_records_written(self):
186
"""Adding a Bytes record increments the records_written counter."""
188
self.writer.add_bytes_record('foo', names=[])
189
self.assertEqual(1, self.writer.records_written)
190
self.writer.add_bytes_record('foo', names=[])
191
self.assertEqual(2, self.writer.records_written)
135
194
class TestContainerReader(tests.TestCase):
195
"""Tests for the ContainerReader.
197
The ContainerReader reads format 1 containers, so these tests explicitly
198
test how it reacts to format 1 data. If a new version of the format is
199
added, then separate tests for that format should be added.
137
202
def get_reader_for(self, bytes):
138
203
stream = StringIO(bytes)
489
560
results.append(f.readline())
490
561
results.append(f.read(4))
491
562
self.assertEqual(['0', '\n', '2\n4\n'], results)
565
class PushParserTestCase(tests.TestCase):
566
"""Base class for TestCases involving ContainerPushParser."""
568
def make_parser_expecting_record_type(self):
569
parser = pack.ContainerPushParser()
570
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
573
def make_parser_expecting_bytes_record(self):
574
parser = pack.ContainerPushParser()
575
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
578
def assertRecordParsing(self, expected_record, bytes):
579
"""Assert that 'bytes' is parsed as a given bytes record.
581
:param expected_record: A tuple of (names, bytes).
583
parser = self.make_parser_expecting_bytes_record()
584
parser.accept_bytes(bytes)
585
parsed_records = parser.read_pending_records()
586
self.assertEqual([expected_record], parsed_records)
589
class TestContainerPushParser(PushParserTestCase):
590
"""Tests for ContainerPushParser.
592
The ContainerPushParser reads format 1 containers, so these tests
593
explicitly test how it reacts to format 1 data. If a new version of the
594
format is added, then separate tests for that format should be added.
597
def test_construct(self):
598
"""ContainerPushParser can be constructed."""
599
pack.ContainerPushParser()
601
def test_multiple_records_at_once(self):
602
"""If multiple records worth of data are fed to the parser in one
603
string, the parser will correctly parse all the records.
605
(A naive implementation might stop after parsing the first record.)
607
parser = self.make_parser_expecting_record_type()
608
parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
610
[([('name1',)], 'body1'), ([('name2',)], 'body2')],
611
parser.read_pending_records())
614
class TestContainerPushParserBytesParsing(PushParserTestCase):
615
"""Tests for reading Bytes records with ContainerPushParser.
617
The ContainerPushParser reads format 1 containers, so these tests
618
explicitly test how it reacts to format 1 data. If a new version of the
619
format is added, then separate tests for that format should be added.
622
def test_record_with_no_name(self):
623
"""Reading a Bytes record with no name returns an empty list of
626
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
628
def test_record_with_one_name(self):
629
"""Reading a Bytes record with one name returns a list of just that
632
self.assertRecordParsing(
633
([('name1', )], 'aaaaa'),
636
def test_record_with_two_names(self):
637
"""Reading a Bytes record with two names returns a list of both names.
639
self.assertRecordParsing(
640
([('name1', ), ('name2', )], 'aaaaa'),
641
"5\nname1\nname2\n\naaaaa")
643
def test_record_with_two_part_names(self):
644
"""Reading a Bytes record with a two_part name reads both."""
645
self.assertRecordParsing(
646
([('name1', 'name2')], 'aaaaa'),
647
"5\nname1\x00name2\n\naaaaa")
649
def test_invalid_length(self):
650
"""If the length-prefix is not a number, parsing raises
653
parser = self.make_parser_expecting_bytes_record()
655
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
657
def test_incomplete_record(self):
658
"""If the bytes seen so far don't form a complete record, then there
659
will be nothing returned by read_pending_records.
661
parser = self.make_parser_expecting_bytes_record()
662
parser.accept_bytes("5\n\nabcd")
663
self.assertEqual([], parser.read_pending_records())
665
def test_accept_nothing(self):
666
"""The edge case of parsing an empty string causes no error."""
667
parser = self.make_parser_expecting_bytes_record()
668
parser.accept_bytes("")
670
def assertInvalidRecord(self, bytes):
671
"""Assert that parsing the given bytes will raise an
674
parser = self.make_parser_expecting_bytes_record()
676
errors.InvalidRecordError, parser.accept_bytes, bytes)
678
def test_read_invalid_name_whitespace(self):
679
"""Names must have no whitespace."""
680
# A name with a space.
681
self.assertInvalidRecord("0\nbad name\n\n")
684
self.assertInvalidRecord("0\nbad\tname\n\n")
686
# A name with a vertical tab.
687
self.assertInvalidRecord("0\nbad\vname\n\n")
689
def test_repeated_read_pending_records(self):
690
"""read_pending_records will not return the same record twice."""
691
parser = self.make_parser_expecting_bytes_record()
692
parser.accept_bytes("6\n\nabcdef")
693
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
694
self.assertEqual([], parser.read_pending_records())