1
# Copyright (C) 2007, 2009, 2011, 2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for breezy.pack."""
19
from .. import errors, tests
23
from ..sixish import (
28
class TestContainerSerialiser(tests.TestCase):
29
"""Tests for the ContainerSerialiser class."""
31
def test_construct(self):
32
"""Test constructing a ContainerSerialiser."""
33
pack.ContainerSerialiser()
36
serialiser = pack.ContainerSerialiser()
37
self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
41
serialiser = pack.ContainerSerialiser()
42
self.assertEqual(b'E', serialiser.end())
44
def test_bytes_record_no_name(self):
45
serialiser = pack.ContainerSerialiser()
46
record = serialiser.bytes_record(b'bytes', [])
47
self.assertEqual(b'B5\n\nbytes', record)
49
def test_bytes_record_one_name_with_one_part(self):
50
serialiser = pack.ContainerSerialiser()
51
record = serialiser.bytes_record(b'bytes', [(b'name',)])
52
self.assertEqual(b'B5\nname\n\nbytes', record)
54
def test_bytes_record_one_name_with_two_parts(self):
55
serialiser = pack.ContainerSerialiser()
56
record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
57
self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
59
def test_bytes_record_two_names(self):
60
serialiser = pack.ContainerSerialiser()
61
record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
62
self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
64
def test_bytes_record_whitespace_in_name_part(self):
65
serialiser = pack.ContainerSerialiser()
67
errors.InvalidRecordError,
68
serialiser.bytes_record, b'bytes', [(b'bad name',)])
70
def test_bytes_record_header(self):
71
serialiser = pack.ContainerSerialiser()
72
record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
73
self.assertEqual(b'B32\nname1\nname2\n\n', record)
76
class TestContainerWriter(tests.TestCase):
79
super(TestContainerWriter, self).setUp()
80
self.output = BytesIO()
81
self.writer = pack.ContainerWriter(self.output.write)
83
def assertOutput(self, expected_output):
84
"""Assert that the output of self.writer ContainerWriter is equal to
87
self.assertEqual(expected_output, self.output.getvalue())
89
def test_construct(self):
90
"""Test constructing a ContainerWriter.
92
This uses None as the output stream to show that the constructor
93
doesn't try to use the output stream.
95
writer = pack.ContainerWriter(None)
98
"""The begin() method writes the container format marker line."""
100
self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\n')
102
def test_zero_records_written_after_begin(self):
103
"""After begin is written, 0 records have been written."""
105
self.assertEqual(0, self.writer.records_written)
108
"""The end() method writes an End Marker record."""
111
self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\nE')
113
def test_empty_end_does_not_add_a_record_to_records_written(self):
114
"""The end() method does not count towards the records written."""
117
self.assertEqual(0, self.writer.records_written)
119
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
120
"""The end() method does not count towards the records written."""
122
self.writer.add_bytes_record(b'foo', names=[])
124
self.assertEqual(1, self.writer.records_written)
126
def test_add_bytes_record_no_name(self):
127
"""Add a bytes record with no name."""
129
offset, length = self.writer.add_bytes_record(b'abc', names=[])
130
self.assertEqual((42, 7), (offset, length))
132
b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
134
def test_add_bytes_record_one_name(self):
135
"""Add a bytes record with one name."""
138
offset, length = self.writer.add_bytes_record(
139
b'abc', names=[(b'name1', )])
140
self.assertEqual((42, 13), (offset, length))
142
b'Bazaar pack format 1 (introduced in 0.18)\n'
145
def test_add_bytes_record_split_writes(self):
146
"""Write a large record which does multiple IOs"""
149
real_write = self.writer.write_func
151
def record_writes(data):
153
return real_write(data)
155
self.writer.write_func = record_writes
156
self.writer._JOIN_WRITES_THRESHOLD = 2
159
offset, length = self.writer.add_bytes_record(
160
b'abcabc', names=[(b'name1', )])
161
self.assertEqual((42, 16), (offset, length))
163
b'Bazaar pack format 1 (introduced in 0.18)\n'
164
b'B6\nname1\n\nabcabc')
167
b'Bazaar pack format 1 (introduced in 0.18)\n',
172
def test_add_bytes_record_two_names(self):
173
"""Add a bytes record with two names."""
175
offset, length = self.writer.add_bytes_record(
176
b'abc', names=[(b'name1', ), (b'name2', )])
177
self.assertEqual((42, 19), (offset, length))
179
b'Bazaar pack format 1 (introduced in 0.18)\n'
180
b'B3\nname1\nname2\n\nabc')
182
def test_add_bytes_record_two_names(self):
183
"""Add a bytes record with two names."""
185
offset, length = self.writer.add_bytes_record(
186
b'abc', names=[(b'name1', ), (b'name2', )])
187
self.assertEqual((42, 19), (offset, length))
189
b'Bazaar pack format 1 (introduced in 0.18)\n'
190
b'B3\nname1\nname2\n\nabc')
192
def test_add_bytes_record_two_element_name(self):
193
"""Add a bytes record with a two-element name."""
195
offset, length = self.writer.add_bytes_record(
196
b'abc', names=[(b'name1', b'name2')])
197
self.assertEqual((42, 19), (offset, length))
199
b'Bazaar pack format 1 (introduced in 0.18)\n'
200
b'B3\nname1\x00name2\n\nabc')
202
def test_add_second_bytes_record_gets_higher_offset(self):
204
self.writer.add_bytes_record(b'abc', names=[])
205
offset, length = self.writer.add_bytes_record(b'abc', names=[])
206
self.assertEqual((49, 7), (offset, length))
208
b'Bazaar pack format 1 (introduced in 0.18)\n'
212
def test_add_bytes_record_invalid_name(self):
213
"""Adding a Bytes record with a name with whitespace in it raises
218
errors.InvalidRecordError,
219
self.writer.add_bytes_record, b'abc', names=[(b'bad name', )])
221
def test_add_bytes_records_add_to_records_written(self):
222
"""Adding a Bytes record increments the records_written counter."""
224
self.writer.add_bytes_record(b'foo', names=[])
225
self.assertEqual(1, self.writer.records_written)
226
self.writer.add_bytes_record(b'foo', names=[])
227
self.assertEqual(2, self.writer.records_written)
230
class TestContainerReader(tests.TestCase):
231
"""Tests for the ContainerReader.
233
The ContainerReader reads format 1 containers, so these tests explicitly
234
test how it reacts to format 1 data. If a new version of the format is
235
added, then separate tests for that format should be added.
238
def get_reader_for(self, data):
239
stream = BytesIO(data)
240
reader = pack.ContainerReader(stream)
243
def test_construct(self):
244
"""Test constructing a ContainerReader.
246
This uses None as the output stream to show that the constructor
247
doesn't try to use the input stream.
249
reader = pack.ContainerReader(None)
251
def test_empty_container(self):
252
"""Read an empty container."""
253
reader = self.get_reader_for(
254
b"Bazaar pack format 1 (introduced in 0.18)\nE")
255
self.assertEqual([], list(reader.iter_records()))
257
def test_unknown_format(self):
258
"""Unrecognised container formats raise UnknownContainerFormatError."""
259
reader = self.get_reader_for(b"unknown format\n")
261
errors.UnknownContainerFormatError, reader.iter_records)
263
def test_unexpected_end_of_container(self):
264
"""Containers that don't end with an End Marker record should cause
265
UnexpectedEndOfContainerError to be raised.
267
reader = self.get_reader_for(
268
b"Bazaar pack format 1 (introduced in 0.18)\n")
269
iterator = reader.iter_records()
271
errors.UnexpectedEndOfContainerError, next, iterator)
273
def test_unknown_record_type(self):
274
"""Unknown record types cause UnknownRecordTypeError to be raised."""
275
reader = self.get_reader_for(
276
b"Bazaar pack format 1 (introduced in 0.18)\nX")
277
iterator = reader.iter_records()
279
errors.UnknownRecordTypeError, next, iterator)
281
def test_container_with_one_unnamed_record(self):
282
"""Read a container with one Bytes record.
284
Parsing Bytes records is more thoroughly exercised by
285
TestBytesRecordReader. This test is here to ensure that
286
ContainerReader's integration with BytesRecordReader is working.
288
reader = self.get_reader_for(
289
b"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
290
expected_records = [([], b'aaaaa')]
293
[(names, read_bytes(None))
294
for (names, read_bytes) in reader.iter_records()])
296
def test_validate_empty_container(self):
297
"""validate does not raise an error for a container with no records."""
298
reader = self.get_reader_for(
299
b"Bazaar pack format 1 (introduced in 0.18)\nE")
300
# No exception raised
303
def test_validate_non_empty_valid_container(self):
304
"""validate does not raise an error for a container with a valid record.
306
reader = self.get_reader_for(
307
b"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
308
# No exception raised
311
def test_validate_bad_format(self):
312
"""validate raises an error for unrecognised format strings.
314
It may raise either UnexpectedEndOfContainerError or
315
UnknownContainerFormatError, depending on exactly what the string is.
318
b"", b"x", b"Bazaar pack format 1 (introduced in 0.18)", b"bad\n"]
320
reader = self.get_reader_for(input)
322
(errors.UnexpectedEndOfContainerError,
323
errors.UnknownContainerFormatError),
326
def test_validate_bad_record_marker(self):
327
"""validate raises UnknownRecordTypeError for unrecognised record
330
reader = self.get_reader_for(
331
b"Bazaar pack format 1 (introduced in 0.18)\nX")
332
self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
334
def test_validate_data_after_end_marker(self):
335
"""validate raises ContainerHasExcessDataError if there are any bytes
336
after the end of the container.
338
reader = self.get_reader_for(
339
b"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
341
errors.ContainerHasExcessDataError, reader.validate)
343
def test_validate_no_end_marker(self):
344
"""validate raises UnexpectedEndOfContainerError if there's no end of
345
container marker, even if the container up to this point has been valid.
347
reader = self.get_reader_for(
348
b"Bazaar pack format 1 (introduced in 0.18)\n")
350
errors.UnexpectedEndOfContainerError, reader.validate)
352
def test_validate_duplicate_name(self):
353
"""validate raises DuplicateRecordNameError if the same name occurs
354
multiple times in the container.
356
reader = self.get_reader_for(
357
b"Bazaar pack format 1 (introduced in 0.18)\n"
361
self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
363
def test_validate_undecodeable_name(self):
364
"""Names that aren't valid UTF-8 cause validate to fail."""
365
reader = self.get_reader_for(
366
b"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
367
self.assertRaises(errors.InvalidRecordError, reader.validate)
370
class TestBytesRecordReader(tests.TestCase):
371
"""Tests for reading and validating Bytes records with
374
Like TestContainerReader, this explicitly tests the reading of format 1
375
data. If a new version of the format is added, then a separate set of
376
tests for reading that format should be added.
379
def get_reader_for(self, data):
380
stream = BytesIO(data)
381
reader = pack.BytesRecordReader(stream)
384
def test_record_with_no_name(self):
385
"""Reading a Bytes record with no name returns an empty list of
388
reader = self.get_reader_for(b"5\n\naaaaa")
389
names, get_bytes = reader.read()
390
self.assertEqual([], names)
391
self.assertEqual(b'aaaaa', get_bytes(None))
393
def test_record_with_one_name(self):
394
"""Reading a Bytes record with one name returns a list of just that
397
reader = self.get_reader_for(b"5\nname1\n\naaaaa")
398
names, get_bytes = reader.read()
399
self.assertEqual([(b'name1', )], names)
400
self.assertEqual(b'aaaaa', get_bytes(None))
402
def test_record_with_two_names(self):
403
"""Reading a Bytes record with two names returns a list of both names.
405
reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
406
names, get_bytes = reader.read()
407
self.assertEqual([(b'name1', ), (b'name2', )], names)
408
self.assertEqual(b'aaaaa', get_bytes(None))
410
def test_record_with_two_part_names(self):
411
"""Reading a Bytes record with a two_part name reads both."""
412
reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
413
names, get_bytes = reader.read()
414
self.assertEqual([(b'name1', b'name2', )], names)
415
self.assertEqual(b'aaaaa', get_bytes(None))
417
def test_invalid_length(self):
418
"""If the length-prefix is not a number, parsing raises
421
reader = self.get_reader_for(b"not a number\n")
422
self.assertRaises(errors.InvalidRecordError, reader.read)
424
def test_early_eof(self):
425
"""Tests for premature EOF occuring during parsing Bytes records with
428
A incomplete container might be interrupted at any point. The
429
BytesRecordReader needs to cope with the input stream running out no
430
matter where it is in the parsing process.
432
In all cases, UnexpectedEndOfContainerError should be raised.
434
complete_record = b"6\nname\n\nabcdef"
435
for count in range(0, len(complete_record)):
436
incomplete_record = complete_record[:count]
437
reader = self.get_reader_for(incomplete_record)
438
# We don't use assertRaises to make diagnosing failures easier
439
# (assertRaises doesn't allow a custom failure message).
441
names, read_bytes = reader.read()
443
except errors.UnexpectedEndOfContainerError:
447
"UnexpectedEndOfContainerError not raised when parsing %r"
448
% (incomplete_record,))
450
def test_initial_eof(self):
451
"""EOF before any bytes read at all."""
452
reader = self.get_reader_for(b"")
453
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
455
def test_eof_after_length(self):
456
"""EOF after reading the length and before reading name(s)."""
457
reader = self.get_reader_for(b"123\n")
458
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
460
def test_eof_during_name(self):
461
"""EOF during reading a name."""
462
reader = self.get_reader_for(b"123\nname")
463
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
465
def test_read_invalid_name_whitespace(self):
466
"""Names must have no whitespace."""
467
# A name with a space.
468
reader = self.get_reader_for(b"0\nbad name\n\n")
469
self.assertRaises(errors.InvalidRecordError, reader.read)
472
reader = self.get_reader_for(b"0\nbad\tname\n\n")
473
self.assertRaises(errors.InvalidRecordError, reader.read)
475
# A name with a vertical tab.
476
reader = self.get_reader_for(b"0\nbad\vname\n\n")
477
self.assertRaises(errors.InvalidRecordError, reader.read)
479
def test_validate_whitespace_in_name(self):
480
"""Names must have no whitespace."""
481
reader = self.get_reader_for(b"0\nbad name\n\n")
482
self.assertRaises(errors.InvalidRecordError, reader.validate)
484
def test_validate_interrupted_prelude(self):
485
"""EOF during reading a record's prelude causes validate to fail."""
486
reader = self.get_reader_for(b"")
488
errors.UnexpectedEndOfContainerError, reader.validate)
490
def test_validate_interrupted_body(self):
491
"""EOF during reading a record's body causes validate to fail."""
492
reader = self.get_reader_for(b"1\n\n")
494
errors.UnexpectedEndOfContainerError, reader.validate)
496
def test_validate_unparseable_length(self):
497
"""An unparseable record length causes validate to fail."""
498
reader = self.get_reader_for(b"\n\n")
500
errors.InvalidRecordError, reader.validate)
502
def test_validate_undecodeable_name(self):
503
"""Names that aren't valid UTF-8 cause validate to fail."""
504
reader = self.get_reader_for(b"0\n\xcc\n\n")
505
self.assertRaises(errors.InvalidRecordError, reader.validate)
507
def test_read_max_length(self):
508
"""If the max_length passed to the callable returned by read is not
509
None, then no more than that many bytes will be read.
511
reader = self.get_reader_for(b"6\n\nabcdef")
512
names, get_bytes = reader.read()
513
self.assertEqual(b'abc', get_bytes(3))
515
def test_read_no_max_length(self):
516
"""If the max_length passed to the callable returned by read is None,
517
then all the bytes in the record will be read.
519
reader = self.get_reader_for(b"6\n\nabcdef")
520
names, get_bytes = reader.read()
521
self.assertEqual(b'abcdef', get_bytes(None))
523
def test_repeated_read_calls(self):
524
"""Repeated calls to the callable returned from BytesRecordReader.read
525
will not read beyond the end of the record.
527
reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
528
names, get_bytes = reader.read()
529
self.assertEqual(b'abcdef', get_bytes(None))
530
self.assertEqual(b'', get_bytes(None))
531
self.assertEqual(b'', get_bytes(99))
534
class TestMakeReadvReader(tests.TestCaseWithTransport):
536
def test_read_skipping_records(self):
537
pack_data = BytesIO()
538
writer = pack.ContainerWriter(pack_data.write)
541
memos.append(writer.add_bytes_record(b'abc', names=[]))
542
memos.append(writer.add_bytes_record(b'def', names=[(b'name1', )]))
543
memos.append(writer.add_bytes_record(b'ghi', names=[(b'name2', )]))
544
memos.append(writer.add_bytes_record(b'jkl', names=[]))
546
transport = self.get_transport()
547
transport.put_bytes('mypack', pack_data.getvalue())
548
requested_records = [memos[0], memos[2]]
549
reader = pack.make_readv_reader(transport, 'mypack', requested_records)
551
for names, reader_func in reader.iter_records():
552
result.append((names, reader_func(None)))
553
self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
556
class TestReadvFile(tests.TestCaseWithTransport):
557
"""Tests of the ReadVFile class.
559
Error cases are deliberately undefined: this code adapts the underlying
560
transport interface to a single 'streaming read' interface as
561
ContainerReader needs.
564
def test_read_bytes(self):
565
"""Test reading of both single bytes and all bytes in a hunk."""
566
transport = self.get_transport()
567
transport.put_bytes('sample', b'0123456789')
568
f = pack.ReadVFile(transport.readv('sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
570
results.append(f.read(1))
571
results.append(f.read(2))
572
results.append(f.read(1))
573
results.append(f.read(1))
574
results.append(f.read(1))
575
self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
577
def test_readline(self):
578
"""Test using readline() as ContainerReader does.
580
This is always within a readv hunk, never across it.
582
transport = self.get_transport()
583
transport.put_bytes('sample', b'0\n2\n4\n')
584
f = pack.ReadVFile(transport.readv('sample', [(0, 2), (2, 4)]))
586
results.append(f.readline())
587
results.append(f.readline())
588
results.append(f.readline())
589
self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
591
def test_readline_and_read(self):
592
"""Test exercising one byte reads, readline, and then read again."""
593
transport = self.get_transport()
594
transport.put_bytes('sample', b'0\n2\n4\n')
595
f = pack.ReadVFile(transport.readv('sample', [(0, 6)]))
597
results.append(f.read(1))
598
results.append(f.readline())
599
results.append(f.read(4))
600
self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
603
class PushParserTestCase(tests.TestCase):
604
"""Base class for TestCases involving ContainerPushParser."""
606
def make_parser_expecting_record_type(self):
607
parser = pack.ContainerPushParser()
608
parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\n")
611
def make_parser_expecting_bytes_record(self):
612
parser = pack.ContainerPushParser()
613
parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\nB")
616
def assertRecordParsing(self, expected_record, data):
617
"""Assert that 'bytes' is parsed as a given bytes record.
619
:param expected_record: A tuple of (names, bytes).
621
parser = self.make_parser_expecting_bytes_record()
622
parser.accept_bytes(data)
623
parsed_records = parser.read_pending_records()
624
self.assertEqual([expected_record], parsed_records)
627
class TestContainerPushParser(PushParserTestCase):
628
"""Tests for ContainerPushParser.
630
The ContainerPushParser reads format 1 containers, so these tests
631
explicitly test how it reacts to format 1 data. If a new version of the
632
format is added, then separate tests for that format should be added.
635
def test_construct(self):
636
"""ContainerPushParser can be constructed."""
637
pack.ContainerPushParser()
639
def test_multiple_records_at_once(self):
640
"""If multiple records worth of data are fed to the parser in one
641
string, the parser will correctly parse all the records.
643
(A naive implementation might stop after parsing the first record.)
645
parser = self.make_parser_expecting_record_type()
646
parser.accept_bytes(b"B5\nname1\n\nbody1B5\nname2\n\nbody2")
648
[([(b'name1',)], b'body1'), ([(b'name2',)], b'body2')],
649
parser.read_pending_records())
651
def test_multiple_empty_records_at_once(self):
652
"""If multiple empty records worth of data are fed to the parser in one
653
string, the parser will correctly parse all the records.
655
(A naive implementation might stop after parsing the first empty
656
record, because the buffer size had not changed.)
658
parser = self.make_parser_expecting_record_type()
659
parser.accept_bytes(b"B0\nname1\n\nB0\nname2\n\n")
661
[([(b'name1',)], b''), ([(b'name2',)], b'')],
662
parser.read_pending_records())
665
class TestContainerPushParserBytesParsing(PushParserTestCase):
666
"""Tests for reading Bytes records with ContainerPushParser.
668
The ContainerPushParser reads format 1 containers, so these tests
669
explicitly test how it reacts to format 1 data. If a new version of the
670
format is added, then separate tests for that format should be added.
673
def test_record_with_no_name(self):
674
"""Reading a Bytes record with no name returns an empty list of
677
self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
679
def test_record_with_one_name(self):
680
"""Reading a Bytes record with one name returns a list of just that
683
self.assertRecordParsing(
684
([(b'name1', )], b'aaaaa'),
685
b"5\nname1\n\naaaaa")
687
def test_record_with_two_names(self):
688
"""Reading a Bytes record with two names returns a list of both names.
690
self.assertRecordParsing(
691
([(b'name1', ), (b'name2', )], b'aaaaa'),
692
b"5\nname1\nname2\n\naaaaa")
694
def test_record_with_two_part_names(self):
695
"""Reading a Bytes record with a two_part name reads both."""
696
self.assertRecordParsing(
697
([(b'name1', b'name2')], b'aaaaa'),
698
b"5\nname1\x00name2\n\naaaaa")
700
def test_invalid_length(self):
701
"""If the length-prefix is not a number, parsing raises
704
parser = self.make_parser_expecting_bytes_record()
706
errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
708
def test_incomplete_record(self):
709
"""If the bytes seen so far don't form a complete record, then there
710
will be nothing returned by read_pending_records.
712
parser = self.make_parser_expecting_bytes_record()
713
parser.accept_bytes(b"5\n\nabcd")
714
self.assertEqual([], parser.read_pending_records())
716
def test_accept_nothing(self):
717
"""The edge case of parsing an empty string causes no error."""
718
parser = self.make_parser_expecting_bytes_record()
719
parser.accept_bytes(b"")
721
def assertInvalidRecord(self, data):
722
"""Assert that parsing the given bytes raises InvalidRecordError."""
723
parser = self.make_parser_expecting_bytes_record()
725
errors.InvalidRecordError, parser.accept_bytes, data)
727
def test_read_invalid_name_whitespace(self):
728
"""Names must have no whitespace."""
729
# A name with a space.
730
self.assertInvalidRecord(b"0\nbad name\n\n")
733
self.assertInvalidRecord(b"0\nbad\tname\n\n")
735
# A name with a vertical tab.
736
self.assertInvalidRecord(b"0\nbad\vname\n\n")
738
def test_repeated_read_pending_records(self):
739
"""read_pending_records will not return the same record twice."""
740
parser = self.make_parser_expecting_bytes_record()
741
parser.accept_bytes(b"6\n\nabcdef")
742
self.assertEqual([([], b'abcdef')], parser.read_pending_records())
743
self.assertEqual([], parser.read_pending_records())