1
# Copyright (C) 2007, 2009, 2011, 2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for breezy.pack."""
19
from io import BytesIO
21
from .. import errors, tests
27
class TestContainerSerialiser(tests.TestCase):
28
"""Tests for the ContainerSerialiser class."""
30
def test_construct(self):
31
"""Test constructing a ContainerSerialiser."""
32
pack.ContainerSerialiser()
35
serialiser = pack.ContainerSerialiser()
36
self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
40
serialiser = pack.ContainerSerialiser()
41
self.assertEqual(b'E', serialiser.end())
43
def test_bytes_record_no_name(self):
44
serialiser = pack.ContainerSerialiser()
45
record = serialiser.bytes_record(b'bytes', [])
46
self.assertEqual(b'B5\n\nbytes', record)
48
def test_bytes_record_one_name_with_one_part(self):
49
serialiser = pack.ContainerSerialiser()
50
record = serialiser.bytes_record(b'bytes', [(b'name',)])
51
self.assertEqual(b'B5\nname\n\nbytes', record)
53
def test_bytes_record_one_name_with_two_parts(self):
54
serialiser = pack.ContainerSerialiser()
55
record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
56
self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
58
def test_bytes_record_two_names(self):
59
serialiser = pack.ContainerSerialiser()
60
record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
61
self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
63
def test_bytes_record_whitespace_in_name_part(self):
64
serialiser = pack.ContainerSerialiser()
66
errors.InvalidRecordError,
67
serialiser.bytes_record, b'bytes', [(b'bad name',)])
69
def test_bytes_record_header(self):
70
serialiser = pack.ContainerSerialiser()
71
record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
72
self.assertEqual(b'B32\nname1\nname2\n\n', record)
75
class TestContainerWriter(tests.TestCase):
78
super(TestContainerWriter, self).setUp()
79
self.output = BytesIO()
80
self.writer = pack.ContainerWriter(self.output.write)
82
def assertOutput(self, expected_output):
83
"""Assert that the output of self.writer ContainerWriter is equal to
86
self.assertEqual(expected_output, self.output.getvalue())
88
def test_construct(self):
89
"""Test constructing a ContainerWriter.
91
This uses None as the output stream to show that the constructor
92
doesn't try to use the output stream.
94
pack.ContainerWriter(None)
97
"""The begin() method writes the container format marker line."""
99
self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\n')
101
def test_zero_records_written_after_begin(self):
102
"""After begin is written, 0 records have been written."""
104
self.assertEqual(0, self.writer.records_written)
107
"""The end() method writes an End Marker record."""
110
self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\nE')
112
def test_empty_end_does_not_add_a_record_to_records_written(self):
113
"""The end() method does not count towards the records written."""
116
self.assertEqual(0, self.writer.records_written)
118
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
119
"""The end() method does not count towards the records written."""
121
self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
123
self.assertEqual(1, self.writer.records_written)
125
def test_add_bytes_record_no_name(self):
126
"""Add a bytes record with no name."""
128
offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
129
self.assertEqual((42, 7), (offset, length))
131
b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
133
def test_add_bytes_record_one_name(self):
134
"""Add a bytes record with one name."""
137
offset, length = self.writer.add_bytes_record(
138
[b'abc'], len(b'abc'), names=[(b'name1', )])
139
self.assertEqual((42, 13), (offset, length))
141
b'Bazaar pack format 1 (introduced in 0.18)\n'
144
def test_add_bytes_record_split_writes(self):
145
"""Write a large record which does multiple IOs"""
148
real_write = self.writer.write_func
150
def record_writes(data):
152
return real_write(data)
154
self.writer.write_func = record_writes
155
self.writer._JOIN_WRITES_THRESHOLD = 2
158
offset, length = self.writer.add_bytes_record(
159
[b'abcabc'], len(b'abcabc'), names=[(b'name1', )])
160
self.assertEqual((42, 16), (offset, length))
162
b'Bazaar pack format 1 (introduced in 0.18)\n'
163
b'B6\nname1\n\nabcabc')
166
b'Bazaar pack format 1 (introduced in 0.18)\n',
171
def test_add_bytes_record_two_names(self):
172
"""Add a bytes record with two names."""
174
offset, length = self.writer.add_bytes_record(
175
[b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
176
self.assertEqual((42, 19), (offset, length))
178
b'Bazaar pack format 1 (introduced in 0.18)\n'
179
b'B3\nname1\nname2\n\nabc')
181
def test_add_bytes_record_two_names(self):
182
"""Add a bytes record with two names."""
184
offset, length = self.writer.add_bytes_record(
185
[b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
186
self.assertEqual((42, 19), (offset, length))
188
b'Bazaar pack format 1 (introduced in 0.18)\n'
189
b'B3\nname1\nname2\n\nabc')
191
def test_add_bytes_record_two_element_name(self):
192
"""Add a bytes record with a two-element name."""
194
offset, length = self.writer.add_bytes_record(
195
[b'abc'], len(b'abc'), names=[(b'name1', b'name2')])
196
self.assertEqual((42, 19), (offset, length))
198
b'Bazaar pack format 1 (introduced in 0.18)\n'
199
b'B3\nname1\x00name2\n\nabc')
201
def test_add_second_bytes_record_gets_higher_offset(self):
203
self.writer.add_bytes_record([b'a', b'bc'], len(b'abc'), names=[])
204
offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
205
self.assertEqual((49, 7), (offset, length))
207
b'Bazaar pack format 1 (introduced in 0.18)\n'
211
def test_add_bytes_record_invalid_name(self):
212
"""Adding a Bytes record with a name with whitespace in it raises
217
errors.InvalidRecordError,
218
self.writer.add_bytes_record, [b'abc'], len(b'abc'), names=[(b'bad name', )])
220
def test_add_bytes_records_add_to_records_written(self):
221
"""Adding a Bytes record increments the records_written counter."""
223
self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
224
self.assertEqual(1, self.writer.records_written)
225
self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
226
self.assertEqual(2, self.writer.records_written)
229
class TestContainerReader(tests.TestCase):
230
"""Tests for the ContainerReader.
232
The ContainerReader reads format 1 containers, so these tests explicitly
233
test how it reacts to format 1 data. If a new version of the format is
234
added, then separate tests for that format should be added.
237
def get_reader_for(self, data):
238
stream = BytesIO(data)
239
reader = pack.ContainerReader(stream)
242
def test_construct(self):
243
"""Test constructing a ContainerReader.
245
This uses None as the output stream to show that the constructor
246
doesn't try to use the input stream.
248
pack.ContainerReader(None)
250
def test_empty_container(self):
251
"""Read an empty container."""
252
reader = self.get_reader_for(
253
b"Bazaar pack format 1 (introduced in 0.18)\nE")
254
self.assertEqual([], list(reader.iter_records()))
256
def test_unknown_format(self):
257
"""Unrecognised container formats raise UnknownContainerFormatError."""
258
reader = self.get_reader_for(b"unknown format\n")
260
errors.UnknownContainerFormatError, reader.iter_records)
262
def test_unexpected_end_of_container(self):
263
"""Containers that don't end with an End Marker record should cause
264
UnexpectedEndOfContainerError to be raised.
266
reader = self.get_reader_for(
267
b"Bazaar pack format 1 (introduced in 0.18)\n")
268
iterator = reader.iter_records()
270
errors.UnexpectedEndOfContainerError, next, iterator)
272
def test_unknown_record_type(self):
273
"""Unknown record types cause UnknownRecordTypeError to be raised."""
274
reader = self.get_reader_for(
275
b"Bazaar pack format 1 (introduced in 0.18)\nX")
276
iterator = reader.iter_records()
278
errors.UnknownRecordTypeError, next, iterator)
280
def test_container_with_one_unnamed_record(self):
281
"""Read a container with one Bytes record.
283
Parsing Bytes records is more thoroughly exercised by
284
TestBytesRecordReader. This test is here to ensure that
285
ContainerReader's integration with BytesRecordReader is working.
287
reader = self.get_reader_for(
288
b"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
289
expected_records = [([], b'aaaaa')]
292
[(names, read_bytes(None))
293
for (names, read_bytes) in reader.iter_records()])
295
def test_validate_empty_container(self):
296
"""validate does not raise an error for a container with no records."""
297
reader = self.get_reader_for(
298
b"Bazaar pack format 1 (introduced in 0.18)\nE")
299
# No exception raised
302
def test_validate_non_empty_valid_container(self):
303
"""validate does not raise an error for a container with a valid record.
305
reader = self.get_reader_for(
306
b"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
307
# No exception raised
310
def test_validate_bad_format(self):
311
"""validate raises an error for unrecognised format strings.
313
It may raise either UnexpectedEndOfContainerError or
314
UnknownContainerFormatError, depending on exactly what the string is.
317
b"", b"x", b"Bazaar pack format 1 (introduced in 0.18)", b"bad\n"]
319
reader = self.get_reader_for(input)
321
(errors.UnexpectedEndOfContainerError,
322
errors.UnknownContainerFormatError),
325
def test_validate_bad_record_marker(self):
326
"""validate raises UnknownRecordTypeError for unrecognised record
329
reader = self.get_reader_for(
330
b"Bazaar pack format 1 (introduced in 0.18)\nX")
331
self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
333
def test_validate_data_after_end_marker(self):
334
"""validate raises ContainerHasExcessDataError if there are any bytes
335
after the end of the container.
337
reader = self.get_reader_for(
338
b"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
340
errors.ContainerHasExcessDataError, reader.validate)
342
def test_validate_no_end_marker(self):
343
"""validate raises UnexpectedEndOfContainerError if there's no end of
344
container marker, even if the container up to this point has been
347
reader = self.get_reader_for(
348
b"Bazaar pack format 1 (introduced in 0.18)\n")
350
errors.UnexpectedEndOfContainerError, reader.validate)
352
def test_validate_duplicate_name(self):
353
"""validate raises DuplicateRecordNameError if the same name occurs
354
multiple times in the container.
356
reader = self.get_reader_for(
357
b"Bazaar pack format 1 (introduced in 0.18)\n"
361
self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
363
def test_validate_undecodeable_name(self):
364
"""Names that aren't valid UTF-8 cause validate to fail."""
365
reader = self.get_reader_for(
366
b"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
367
self.assertRaises(errors.InvalidRecordError, reader.validate)
370
class TestBytesRecordReader(tests.TestCase):
371
"""Tests for reading and validating Bytes records with
374
Like TestContainerReader, this explicitly tests the reading of format 1
375
data. If a new version of the format is added, then a separate set of
376
tests for reading that format should be added.
379
def get_reader_for(self, data):
380
stream = BytesIO(data)
381
reader = pack.BytesRecordReader(stream)
384
def test_record_with_no_name(self):
385
"""Reading a Bytes record with no name returns an empty list of
388
reader = self.get_reader_for(b"5\n\naaaaa")
389
names, get_bytes = reader.read()
390
self.assertEqual([], names)
391
self.assertEqual(b'aaaaa', get_bytes(None))
393
def test_record_with_one_name(self):
394
"""Reading a Bytes record with one name returns a list of just that
397
reader = self.get_reader_for(b"5\nname1\n\naaaaa")
398
names, get_bytes = reader.read()
399
self.assertEqual([(b'name1', )], names)
400
self.assertEqual(b'aaaaa', get_bytes(None))
402
def test_record_with_two_names(self):
403
"""Reading a Bytes record with two names returns a list of both names.
405
reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
406
names, get_bytes = reader.read()
407
self.assertEqual([(b'name1', ), (b'name2', )], names)
408
self.assertEqual(b'aaaaa', get_bytes(None))
410
def test_record_with_two_part_names(self):
411
"""Reading a Bytes record with a two_part name reads both."""
412
reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
413
names, get_bytes = reader.read()
414
self.assertEqual([(b'name1', b'name2', )], names)
415
self.assertEqual(b'aaaaa', get_bytes(None))
417
def test_invalid_length(self):
418
"""If the length-prefix is not a number, parsing raises
421
reader = self.get_reader_for(b"not a number\n")
422
self.assertRaises(errors.InvalidRecordError, reader.read)
424
def test_early_eof(self):
425
"""Tests for premature EOF occuring during parsing Bytes records with
428
A incomplete container might be interrupted at any point. The
429
BytesRecordReader needs to cope with the input stream running out no
430
matter where it is in the parsing process.
432
In all cases, UnexpectedEndOfContainerError should be raised.
434
complete_record = b"6\nname\n\nabcdef"
435
for count in range(0, len(complete_record)):
436
incomplete_record = complete_record[:count]
437
reader = self.get_reader_for(incomplete_record)
438
# We don't use assertRaises to make diagnosing failures easier
439
# (assertRaises doesn't allow a custom failure message).
441
names, read_bytes = reader.read()
443
except errors.UnexpectedEndOfContainerError:
447
"UnexpectedEndOfContainerError not raised when parsing %r"
448
% (incomplete_record,))
450
def test_initial_eof(self):
451
"""EOF before any bytes read at all."""
452
reader = self.get_reader_for(b"")
453
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
455
def test_eof_after_length(self):
456
"""EOF after reading the length and before reading name(s)."""
457
reader = self.get_reader_for(b"123\n")
458
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
460
def test_eof_during_name(self):
461
"""EOF during reading a name."""
462
reader = self.get_reader_for(b"123\nname")
463
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
465
def test_read_invalid_name_whitespace(self):
466
"""Names must have no whitespace."""
467
# A name with a space.
468
reader = self.get_reader_for(b"0\nbad name\n\n")
469
self.assertRaises(errors.InvalidRecordError, reader.read)
472
reader = self.get_reader_for(b"0\nbad\tname\n\n")
473
self.assertRaises(errors.InvalidRecordError, reader.read)
475
# A name with a vertical tab.
476
reader = self.get_reader_for(b"0\nbad\vname\n\n")
477
self.assertRaises(errors.InvalidRecordError, reader.read)
479
def test_validate_whitespace_in_name(self):
480
"""Names must have no whitespace."""
481
reader = self.get_reader_for(b"0\nbad name\n\n")
482
self.assertRaises(errors.InvalidRecordError, reader.validate)
484
def test_validate_interrupted_prelude(self):
485
"""EOF during reading a record's prelude causes validate to fail."""
486
reader = self.get_reader_for(b"")
488
errors.UnexpectedEndOfContainerError, reader.validate)
490
def test_validate_interrupted_body(self):
491
"""EOF during reading a record's body causes validate to fail."""
492
reader = self.get_reader_for(b"1\n\n")
494
errors.UnexpectedEndOfContainerError, reader.validate)
496
def test_validate_unparseable_length(self):
497
"""An unparseable record length causes validate to fail."""
498
reader = self.get_reader_for(b"\n\n")
500
errors.InvalidRecordError, reader.validate)
502
def test_validate_undecodeable_name(self):
503
"""Names that aren't valid UTF-8 cause validate to fail."""
504
reader = self.get_reader_for(b"0\n\xcc\n\n")
505
self.assertRaises(errors.InvalidRecordError, reader.validate)
507
def test_read_max_length(self):
508
"""If the max_length passed to the callable returned by read is not
509
None, then no more than that many bytes will be read.
511
reader = self.get_reader_for(b"6\n\nabcdef")
512
names, get_bytes = reader.read()
513
self.assertEqual(b'abc', get_bytes(3))
515
def test_read_no_max_length(self):
516
"""If the max_length passed to the callable returned by read is None,
517
then all the bytes in the record will be read.
519
reader = self.get_reader_for(b"6\n\nabcdef")
520
names, get_bytes = reader.read()
521
self.assertEqual(b'abcdef', get_bytes(None))
523
def test_repeated_read_calls(self):
524
"""Repeated calls to the callable returned from BytesRecordReader.read
525
will not read beyond the end of the record.
527
reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
528
names, get_bytes = reader.read()
529
self.assertEqual(b'abcdef', get_bytes(None))
530
self.assertEqual(b'', get_bytes(None))
531
self.assertEqual(b'', get_bytes(99))
534
class TestMakeReadvReader(tests.TestCaseWithTransport):
536
def test_read_skipping_records(self):
537
pack_data = BytesIO()
538
writer = pack.ContainerWriter(pack_data.write)
541
memos.append(writer.add_bytes_record([b'abc'], 3, names=[]))
542
memos.append(writer.add_bytes_record([b'def'], 3, names=[(b'name1', )]))
543
memos.append(writer.add_bytes_record([b'ghi'], 3, names=[(b'name2', )]))
544
memos.append(writer.add_bytes_record([b'jkl'], 3, names=[]))
546
transport = self.get_transport()
547
transport.put_bytes('mypack', pack_data.getvalue())
548
requested_records = [memos[0], memos[2]]
549
reader = pack.make_readv_reader(transport, 'mypack', requested_records)
551
for names, reader_func in reader.iter_records():
552
result.append((names, reader_func(None)))
553
self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
556
class TestReadvFile(tests.TestCaseWithTransport):
557
"""Tests of the ReadVFile class.
559
Error cases are deliberately undefined: this code adapts the underlying
560
transport interface to a single 'streaming read' interface as
561
ContainerReader needs.
564
def test_read_bytes(self):
565
"""Test reading of both single bytes and all bytes in a hunk."""
566
transport = self.get_transport()
567
transport.put_bytes('sample', b'0123456789')
568
f = pack.ReadVFile(transport.readv(
569
'sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
571
results.append(f.read(1))
572
results.append(f.read(2))
573
results.append(f.read(1))
574
results.append(f.read(1))
575
results.append(f.read(1))
576
self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
578
def test_readline(self):
579
"""Test using readline() as ContainerReader does.
581
This is always within a readv hunk, never across it.
583
transport = self.get_transport()
584
transport.put_bytes('sample', b'0\n2\n4\n')
585
f = pack.ReadVFile(transport.readv('sample', [(0, 2), (2, 4)]))
587
results.append(f.readline())
588
results.append(f.readline())
589
results.append(f.readline())
590
self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
592
def test_readline_and_read(self):
593
"""Test exercising one byte reads, readline, and then read again."""
594
transport = self.get_transport()
595
transport.put_bytes('sample', b'0\n2\n4\n')
596
f = pack.ReadVFile(transport.readv('sample', [(0, 6)]))
598
results.append(f.read(1))
599
results.append(f.readline())
600
results.append(f.read(4))
601
self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
604
class PushParserTestCase(tests.TestCase):
605
"""Base class for TestCases involving ContainerPushParser."""
607
def make_parser_expecting_record_type(self):
608
parser = pack.ContainerPushParser()
609
parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\n")
612
def make_parser_expecting_bytes_record(self):
613
parser = pack.ContainerPushParser()
614
parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\nB")
617
def assertRecordParsing(self, expected_record, data):
618
"""Assert that 'bytes' is parsed as a given bytes record.
620
:param expected_record: A tuple of (names, bytes).
622
parser = self.make_parser_expecting_bytes_record()
623
parser.accept_bytes(data)
624
parsed_records = parser.read_pending_records()
625
self.assertEqual([expected_record], parsed_records)
628
class TestContainerPushParser(PushParserTestCase):
629
"""Tests for ContainerPushParser.
631
The ContainerPushParser reads format 1 containers, so these tests
632
explicitly test how it reacts to format 1 data. If a new version of the
633
format is added, then separate tests for that format should be added.
636
def test_construct(self):
637
"""ContainerPushParser can be constructed."""
638
pack.ContainerPushParser()
640
def test_multiple_records_at_once(self):
641
"""If multiple records worth of data are fed to the parser in one
642
string, the parser will correctly parse all the records.
644
(A naive implementation might stop after parsing the first record.)
646
parser = self.make_parser_expecting_record_type()
647
parser.accept_bytes(b"B5\nname1\n\nbody1B5\nname2\n\nbody2")
649
[([(b'name1',)], b'body1'), ([(b'name2',)], b'body2')],
650
parser.read_pending_records())
652
def test_multiple_empty_records_at_once(self):
653
"""If multiple empty records worth of data are fed to the parser in one
654
string, the parser will correctly parse all the records.
656
(A naive implementation might stop after parsing the first empty
657
record, because the buffer size had not changed.)
659
parser = self.make_parser_expecting_record_type()
660
parser.accept_bytes(b"B0\nname1\n\nB0\nname2\n\n")
662
[([(b'name1',)], b''), ([(b'name2',)], b'')],
663
parser.read_pending_records())
666
class TestContainerPushParserBytesParsing(PushParserTestCase):
667
"""Tests for reading Bytes records with ContainerPushParser.
669
The ContainerPushParser reads format 1 containers, so these tests
670
explicitly test how it reacts to format 1 data. If a new version of the
671
format is added, then separate tests for that format should be added.
674
def test_record_with_no_name(self):
675
"""Reading a Bytes record with no name returns an empty list of
678
self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
680
def test_record_with_one_name(self):
681
"""Reading a Bytes record with one name returns a list of just that
684
self.assertRecordParsing(
685
([(b'name1', )], b'aaaaa'),
686
b"5\nname1\n\naaaaa")
688
def test_record_with_two_names(self):
689
"""Reading a Bytes record with two names returns a list of both names.
691
self.assertRecordParsing(
692
([(b'name1', ), (b'name2', )], b'aaaaa'),
693
b"5\nname1\nname2\n\naaaaa")
695
def test_record_with_two_part_names(self):
696
"""Reading a Bytes record with a two_part name reads both."""
697
self.assertRecordParsing(
698
([(b'name1', b'name2')], b'aaaaa'),
699
b"5\nname1\x00name2\n\naaaaa")
701
def test_invalid_length(self):
702
"""If the length-prefix is not a number, parsing raises
705
parser = self.make_parser_expecting_bytes_record()
707
errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
709
def test_incomplete_record(self):
710
"""If the bytes seen so far don't form a complete record, then there
711
will be nothing returned by read_pending_records.
713
parser = self.make_parser_expecting_bytes_record()
714
parser.accept_bytes(b"5\n\nabcd")
715
self.assertEqual([], parser.read_pending_records())
717
def test_accept_nothing(self):
718
"""The edge case of parsing an empty string causes no error."""
719
parser = self.make_parser_expecting_bytes_record()
720
parser.accept_bytes(b"")
722
def assertInvalidRecord(self, data):
723
"""Assert that parsing the given bytes raises InvalidRecordError."""
724
parser = self.make_parser_expecting_bytes_record()
726
errors.InvalidRecordError, parser.accept_bytes, data)
728
def test_read_invalid_name_whitespace(self):
729
"""Names must have no whitespace."""
730
# A name with a space.
731
self.assertInvalidRecord(b"0\nbad name\n\n")
734
self.assertInvalidRecord(b"0\nbad\tname\n\n")
736
# A name with a vertical tab.
737
self.assertInvalidRecord(b"0\nbad\vname\n\n")
739
def test_repeated_read_pending_records(self):
740
"""read_pending_records will not return the same record twice."""
741
parser = self.make_parser_expecting_bytes_record()
742
parser.accept_bytes(b"6\n\nabcdef")
743
self.assertEqual([([], b'abcdef')], parser.read_pending_records())
744
self.assertEqual([], parser.read_pending_records())