1
# Copyright (C) 2007, 2009, 2011, 2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for breezy.pack."""
19
from .. import errors, tests
23
from ..sixish import (
28
class TestContainerSerialiser(tests.TestCase):
29
"""Tests for the ContainerSerialiser class."""
31
def test_construct(self):
32
"""Test constructing a ContainerSerialiser."""
33
pack.ContainerSerialiser()
36
serialiser = pack.ContainerSerialiser()
37
self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
41
serialiser = pack.ContainerSerialiser()
42
self.assertEqual(b'E', serialiser.end())
44
def test_bytes_record_no_name(self):
45
serialiser = pack.ContainerSerialiser()
46
record = serialiser.bytes_record(b'bytes', [])
47
self.assertEqual(b'B5\n\nbytes', record)
49
def test_bytes_record_one_name_with_one_part(self):
50
serialiser = pack.ContainerSerialiser()
51
record = serialiser.bytes_record(b'bytes', [(b'name',)])
52
self.assertEqual(b'B5\nname\n\nbytes', record)
54
def test_bytes_record_one_name_with_two_parts(self):
55
serialiser = pack.ContainerSerialiser()
56
record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
57
self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
59
def test_bytes_record_two_names(self):
60
serialiser = pack.ContainerSerialiser()
61
record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
62
self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
64
def test_bytes_record_whitespace_in_name_part(self):
65
serialiser = pack.ContainerSerialiser()
67
errors.InvalidRecordError,
68
serialiser.bytes_record, b'bytes', [(b'bad name',)])
70
def test_bytes_record_header(self):
71
serialiser = pack.ContainerSerialiser()
72
record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
73
self.assertEqual(b'B32\nname1\nname2\n\n', record)
76
class TestContainerWriter(tests.TestCase):
79
super(TestContainerWriter, self).setUp()
80
self.output = BytesIO()
81
self.writer = pack.ContainerWriter(self.output.write)
83
def assertOutput(self, expected_output):
84
"""Assert that the output of self.writer ContainerWriter is equal to
87
self.assertEqual(expected_output, self.output.getvalue())
89
def test_construct(self):
90
"""Test constructing a ContainerWriter.
92
This uses None as the output stream to show that the constructor
93
doesn't try to use the output stream.
95
pack.ContainerWriter(None)
98
"""The begin() method writes the container format marker line."""
100
self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\n')
102
def test_zero_records_written_after_begin(self):
103
"""After begin is written, 0 records have been written."""
105
self.assertEqual(0, self.writer.records_written)
108
"""The end() method writes an End Marker record."""
111
self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\nE')
113
def test_empty_end_does_not_add_a_record_to_records_written(self):
114
"""The end() method does not count towards the records written."""
117
self.assertEqual(0, self.writer.records_written)
119
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
120
"""The end() method does not count towards the records written."""
122
self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
124
self.assertEqual(1, self.writer.records_written)
126
def test_add_bytes_record_no_name(self):
127
"""Add a bytes record with no name."""
129
offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
130
self.assertEqual((42, 7), (offset, length))
132
b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
134
def test_add_bytes_record_one_name(self):
135
"""Add a bytes record with one name."""
138
offset, length = self.writer.add_bytes_record(
139
[b'abc'], len(b'abc'), names=[(b'name1', )])
140
self.assertEqual((42, 13), (offset, length))
142
b'Bazaar pack format 1 (introduced in 0.18)\n'
145
def test_add_bytes_record_split_writes(self):
146
"""Write a large record which does multiple IOs"""
149
real_write = self.writer.write_func
151
def record_writes(data):
153
return real_write(data)
155
self.writer.write_func = record_writes
156
self.writer._JOIN_WRITES_THRESHOLD = 2
159
offset, length = self.writer.add_bytes_record(
160
[b'abcabc'], len(b'abcabc'), names=[(b'name1', )])
161
self.assertEqual((42, 16), (offset, length))
163
b'Bazaar pack format 1 (introduced in 0.18)\n'
164
b'B6\nname1\n\nabcabc')
167
b'Bazaar pack format 1 (introduced in 0.18)\n',
172
def test_add_bytes_record_two_names(self):
173
"""Add a bytes record with two names."""
175
offset, length = self.writer.add_bytes_record(
176
[b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
177
self.assertEqual((42, 19), (offset, length))
179
b'Bazaar pack format 1 (introduced in 0.18)\n'
180
b'B3\nname1\nname2\n\nabc')
182
def test_add_bytes_record_two_names(self):
183
"""Add a bytes record with two names."""
185
offset, length = self.writer.add_bytes_record(
186
[b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
187
self.assertEqual((42, 19), (offset, length))
189
b'Bazaar pack format 1 (introduced in 0.18)\n'
190
b'B3\nname1\nname2\n\nabc')
192
def test_add_bytes_record_two_element_name(self):
193
"""Add a bytes record with a two-element name."""
195
offset, length = self.writer.add_bytes_record(
196
[b'abc'], len(b'abc'), names=[(b'name1', b'name2')])
197
self.assertEqual((42, 19), (offset, length))
199
b'Bazaar pack format 1 (introduced in 0.18)\n'
200
b'B3\nname1\x00name2\n\nabc')
202
def test_add_second_bytes_record_gets_higher_offset(self):
204
self.writer.add_bytes_record([b'a', b'bc'], len(b'abc'), names=[])
205
offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
206
self.assertEqual((49, 7), (offset, length))
208
b'Bazaar pack format 1 (introduced in 0.18)\n'
212
def test_add_bytes_record_invalid_name(self):
213
"""Adding a Bytes record with a name with whitespace in it raises
218
errors.InvalidRecordError,
219
self.writer.add_bytes_record, [b'abc'], len(b'abc'), names=[(b'bad name', )])
221
def test_add_bytes_records_add_to_records_written(self):
222
"""Adding a Bytes record increments the records_written counter."""
224
self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
225
self.assertEqual(1, self.writer.records_written)
226
self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
227
self.assertEqual(2, self.writer.records_written)
230
class TestContainerReader(tests.TestCase):
231
"""Tests for the ContainerReader.
233
The ContainerReader reads format 1 containers, so these tests explicitly
234
test how it reacts to format 1 data. If a new version of the format is
235
added, then separate tests for that format should be added.
238
def get_reader_for(self, data):
239
stream = BytesIO(data)
240
reader = pack.ContainerReader(stream)
243
def test_construct(self):
244
"""Test constructing a ContainerReader.
246
This uses None as the output stream to show that the constructor
247
doesn't try to use the input stream.
249
pack.ContainerReader(None)
251
def test_empty_container(self):
252
"""Read an empty container."""
253
reader = self.get_reader_for(
254
b"Bazaar pack format 1 (introduced in 0.18)\nE")
255
self.assertEqual([], list(reader.iter_records()))
257
def test_unknown_format(self):
258
"""Unrecognised container formats raise UnknownContainerFormatError."""
259
reader = self.get_reader_for(b"unknown format\n")
261
errors.UnknownContainerFormatError, reader.iter_records)
263
def test_unexpected_end_of_container(self):
264
"""Containers that don't end with an End Marker record should cause
265
UnexpectedEndOfContainerError to be raised.
267
reader = self.get_reader_for(
268
b"Bazaar pack format 1 (introduced in 0.18)\n")
269
iterator = reader.iter_records()
271
errors.UnexpectedEndOfContainerError, next, iterator)
273
def test_unknown_record_type(self):
274
"""Unknown record types cause UnknownRecordTypeError to be raised."""
275
reader = self.get_reader_for(
276
b"Bazaar pack format 1 (introduced in 0.18)\nX")
277
iterator = reader.iter_records()
279
errors.UnknownRecordTypeError, next, iterator)
281
def test_container_with_one_unnamed_record(self):
282
"""Read a container with one Bytes record.
284
Parsing Bytes records is more thoroughly exercised by
285
TestBytesRecordReader. This test is here to ensure that
286
ContainerReader's integration with BytesRecordReader is working.
288
reader = self.get_reader_for(
289
b"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
290
expected_records = [([], b'aaaaa')]
293
[(names, read_bytes(None))
294
for (names, read_bytes) in reader.iter_records()])
296
def test_validate_empty_container(self):
297
"""validate does not raise an error for a container with no records."""
298
reader = self.get_reader_for(
299
b"Bazaar pack format 1 (introduced in 0.18)\nE")
300
# No exception raised
303
def test_validate_non_empty_valid_container(self):
304
"""validate does not raise an error for a container with a valid record.
306
reader = self.get_reader_for(
307
b"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
308
# No exception raised
311
def test_validate_bad_format(self):
312
"""validate raises an error for unrecognised format strings.
314
It may raise either UnexpectedEndOfContainerError or
315
UnknownContainerFormatError, depending on exactly what the string is.
318
b"", b"x", b"Bazaar pack format 1 (introduced in 0.18)", b"bad\n"]
320
reader = self.get_reader_for(input)
322
(errors.UnexpectedEndOfContainerError,
323
errors.UnknownContainerFormatError),
326
def test_validate_bad_record_marker(self):
327
"""validate raises UnknownRecordTypeError for unrecognised record
330
reader = self.get_reader_for(
331
b"Bazaar pack format 1 (introduced in 0.18)\nX")
332
self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
334
def test_validate_data_after_end_marker(self):
335
"""validate raises ContainerHasExcessDataError if there are any bytes
336
after the end of the container.
338
reader = self.get_reader_for(
339
b"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
341
errors.ContainerHasExcessDataError, reader.validate)
343
def test_validate_no_end_marker(self):
344
"""validate raises UnexpectedEndOfContainerError if there's no end of
345
container marker, even if the container up to this point has been
348
reader = self.get_reader_for(
349
b"Bazaar pack format 1 (introduced in 0.18)\n")
351
errors.UnexpectedEndOfContainerError, reader.validate)
353
def test_validate_duplicate_name(self):
354
"""validate raises DuplicateRecordNameError if the same name occurs
355
multiple times in the container.
357
reader = self.get_reader_for(
358
b"Bazaar pack format 1 (introduced in 0.18)\n"
362
self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
364
def test_validate_undecodeable_name(self):
365
"""Names that aren't valid UTF-8 cause validate to fail."""
366
reader = self.get_reader_for(
367
b"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
368
self.assertRaises(errors.InvalidRecordError, reader.validate)
371
class TestBytesRecordReader(tests.TestCase):
372
"""Tests for reading and validating Bytes records with
375
Like TestContainerReader, this explicitly tests the reading of format 1
376
data. If a new version of the format is added, then a separate set of
377
tests for reading that format should be added.
380
def get_reader_for(self, data):
381
stream = BytesIO(data)
382
reader = pack.BytesRecordReader(stream)
385
def test_record_with_no_name(self):
386
"""Reading a Bytes record with no name returns an empty list of
389
reader = self.get_reader_for(b"5\n\naaaaa")
390
names, get_bytes = reader.read()
391
self.assertEqual([], names)
392
self.assertEqual(b'aaaaa', get_bytes(None))
394
def test_record_with_one_name(self):
395
"""Reading a Bytes record with one name returns a list of just that
398
reader = self.get_reader_for(b"5\nname1\n\naaaaa")
399
names, get_bytes = reader.read()
400
self.assertEqual([(b'name1', )], names)
401
self.assertEqual(b'aaaaa', get_bytes(None))
403
def test_record_with_two_names(self):
404
"""Reading a Bytes record with two names returns a list of both names.
406
reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
407
names, get_bytes = reader.read()
408
self.assertEqual([(b'name1', ), (b'name2', )], names)
409
self.assertEqual(b'aaaaa', get_bytes(None))
411
def test_record_with_two_part_names(self):
412
"""Reading a Bytes record with a two_part name reads both."""
413
reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
414
names, get_bytes = reader.read()
415
self.assertEqual([(b'name1', b'name2', )], names)
416
self.assertEqual(b'aaaaa', get_bytes(None))
418
def test_invalid_length(self):
419
"""If the length-prefix is not a number, parsing raises
422
reader = self.get_reader_for(b"not a number\n")
423
self.assertRaises(errors.InvalidRecordError, reader.read)
425
def test_early_eof(self):
426
"""Tests for premature EOF occuring during parsing Bytes records with
429
A incomplete container might be interrupted at any point. The
430
BytesRecordReader needs to cope with the input stream running out no
431
matter where it is in the parsing process.
433
In all cases, UnexpectedEndOfContainerError should be raised.
435
complete_record = b"6\nname\n\nabcdef"
436
for count in range(0, len(complete_record)):
437
incomplete_record = complete_record[:count]
438
reader = self.get_reader_for(incomplete_record)
439
# We don't use assertRaises to make diagnosing failures easier
440
# (assertRaises doesn't allow a custom failure message).
442
names, read_bytes = reader.read()
444
except errors.UnexpectedEndOfContainerError:
448
"UnexpectedEndOfContainerError not raised when parsing %r"
449
% (incomplete_record,))
451
def test_initial_eof(self):
452
"""EOF before any bytes read at all."""
453
reader = self.get_reader_for(b"")
454
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
456
def test_eof_after_length(self):
457
"""EOF after reading the length and before reading name(s)."""
458
reader = self.get_reader_for(b"123\n")
459
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
461
def test_eof_during_name(self):
462
"""EOF during reading a name."""
463
reader = self.get_reader_for(b"123\nname")
464
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
466
def test_read_invalid_name_whitespace(self):
467
"""Names must have no whitespace."""
468
# A name with a space.
469
reader = self.get_reader_for(b"0\nbad name\n\n")
470
self.assertRaises(errors.InvalidRecordError, reader.read)
473
reader = self.get_reader_for(b"0\nbad\tname\n\n")
474
self.assertRaises(errors.InvalidRecordError, reader.read)
476
# A name with a vertical tab.
477
reader = self.get_reader_for(b"0\nbad\vname\n\n")
478
self.assertRaises(errors.InvalidRecordError, reader.read)
480
def test_validate_whitespace_in_name(self):
481
"""Names must have no whitespace."""
482
reader = self.get_reader_for(b"0\nbad name\n\n")
483
self.assertRaises(errors.InvalidRecordError, reader.validate)
485
def test_validate_interrupted_prelude(self):
486
"""EOF during reading a record's prelude causes validate to fail."""
487
reader = self.get_reader_for(b"")
489
errors.UnexpectedEndOfContainerError, reader.validate)
491
def test_validate_interrupted_body(self):
492
"""EOF during reading a record's body causes validate to fail."""
493
reader = self.get_reader_for(b"1\n\n")
495
errors.UnexpectedEndOfContainerError, reader.validate)
497
def test_validate_unparseable_length(self):
498
"""An unparseable record length causes validate to fail."""
499
reader = self.get_reader_for(b"\n\n")
501
errors.InvalidRecordError, reader.validate)
503
def test_validate_undecodeable_name(self):
504
"""Names that aren't valid UTF-8 cause validate to fail."""
505
reader = self.get_reader_for(b"0\n\xcc\n\n")
506
self.assertRaises(errors.InvalidRecordError, reader.validate)
508
def test_read_max_length(self):
509
"""If the max_length passed to the callable returned by read is not
510
None, then no more than that many bytes will be read.
512
reader = self.get_reader_for(b"6\n\nabcdef")
513
names, get_bytes = reader.read()
514
self.assertEqual(b'abc', get_bytes(3))
516
def test_read_no_max_length(self):
517
"""If the max_length passed to the callable returned by read is None,
518
then all the bytes in the record will be read.
520
reader = self.get_reader_for(b"6\n\nabcdef")
521
names, get_bytes = reader.read()
522
self.assertEqual(b'abcdef', get_bytes(None))
524
def test_repeated_read_calls(self):
525
"""Repeated calls to the callable returned from BytesRecordReader.read
526
will not read beyond the end of the record.
528
reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
529
names, get_bytes = reader.read()
530
self.assertEqual(b'abcdef', get_bytes(None))
531
self.assertEqual(b'', get_bytes(None))
532
self.assertEqual(b'', get_bytes(99))
535
class TestMakeReadvReader(tests.TestCaseWithTransport):
537
def test_read_skipping_records(self):
538
pack_data = BytesIO()
539
writer = pack.ContainerWriter(pack_data.write)
542
memos.append(writer.add_bytes_record([b'abc'], 3, names=[]))
543
memos.append(writer.add_bytes_record([b'def'], 3, names=[(b'name1', )]))
544
memos.append(writer.add_bytes_record([b'ghi'], 3, names=[(b'name2', )]))
545
memos.append(writer.add_bytes_record([b'jkl'], 3, names=[]))
547
transport = self.get_transport()
548
transport.put_bytes('mypack', pack_data.getvalue())
549
requested_records = [memos[0], memos[2]]
550
reader = pack.make_readv_reader(transport, 'mypack', requested_records)
552
for names, reader_func in reader.iter_records():
553
result.append((names, reader_func(None)))
554
self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
557
class TestReadvFile(tests.TestCaseWithTransport):
558
"""Tests of the ReadVFile class.
560
Error cases are deliberately undefined: this code adapts the underlying
561
transport interface to a single 'streaming read' interface as
562
ContainerReader needs.
565
def test_read_bytes(self):
566
"""Test reading of both single bytes and all bytes in a hunk."""
567
transport = self.get_transport()
568
transport.put_bytes('sample', b'0123456789')
569
f = pack.ReadVFile(transport.readv(
570
'sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
572
results.append(f.read(1))
573
results.append(f.read(2))
574
results.append(f.read(1))
575
results.append(f.read(1))
576
results.append(f.read(1))
577
self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
579
def test_readline(self):
580
"""Test using readline() as ContainerReader does.
582
This is always within a readv hunk, never across it.
584
transport = self.get_transport()
585
transport.put_bytes('sample', b'0\n2\n4\n')
586
f = pack.ReadVFile(transport.readv('sample', [(0, 2), (2, 4)]))
588
results.append(f.readline())
589
results.append(f.readline())
590
results.append(f.readline())
591
self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
593
def test_readline_and_read(self):
594
"""Test exercising one byte reads, readline, and then read again."""
595
transport = self.get_transport()
596
transport.put_bytes('sample', b'0\n2\n4\n')
597
f = pack.ReadVFile(transport.readv('sample', [(0, 6)]))
599
results.append(f.read(1))
600
results.append(f.readline())
601
results.append(f.read(4))
602
self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
605
class PushParserTestCase(tests.TestCase):
606
"""Base class for TestCases involving ContainerPushParser."""
608
def make_parser_expecting_record_type(self):
609
parser = pack.ContainerPushParser()
610
parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\n")
613
def make_parser_expecting_bytes_record(self):
614
parser = pack.ContainerPushParser()
615
parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\nB")
618
def assertRecordParsing(self, expected_record, data):
619
"""Assert that 'bytes' is parsed as a given bytes record.
621
:param expected_record: A tuple of (names, bytes).
623
parser = self.make_parser_expecting_bytes_record()
624
parser.accept_bytes(data)
625
parsed_records = parser.read_pending_records()
626
self.assertEqual([expected_record], parsed_records)
629
class TestContainerPushParser(PushParserTestCase):
630
"""Tests for ContainerPushParser.
632
The ContainerPushParser reads format 1 containers, so these tests
633
explicitly test how it reacts to format 1 data. If a new version of the
634
format is added, then separate tests for that format should be added.
637
def test_construct(self):
638
"""ContainerPushParser can be constructed."""
639
pack.ContainerPushParser()
641
def test_multiple_records_at_once(self):
642
"""If multiple records worth of data are fed to the parser in one
643
string, the parser will correctly parse all the records.
645
(A naive implementation might stop after parsing the first record.)
647
parser = self.make_parser_expecting_record_type()
648
parser.accept_bytes(b"B5\nname1\n\nbody1B5\nname2\n\nbody2")
650
[([(b'name1',)], b'body1'), ([(b'name2',)], b'body2')],
651
parser.read_pending_records())
653
def test_multiple_empty_records_at_once(self):
654
"""If multiple empty records worth of data are fed to the parser in one
655
string, the parser will correctly parse all the records.
657
(A naive implementation might stop after parsing the first empty
658
record, because the buffer size had not changed.)
660
parser = self.make_parser_expecting_record_type()
661
parser.accept_bytes(b"B0\nname1\n\nB0\nname2\n\n")
663
[([(b'name1',)], b''), ([(b'name2',)], b'')],
664
parser.read_pending_records())
667
class TestContainerPushParserBytesParsing(PushParserTestCase):
668
"""Tests for reading Bytes records with ContainerPushParser.
670
The ContainerPushParser reads format 1 containers, so these tests
671
explicitly test how it reacts to format 1 data. If a new version of the
672
format is added, then separate tests for that format should be added.
675
def test_record_with_no_name(self):
676
"""Reading a Bytes record with no name returns an empty list of
679
self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
681
def test_record_with_one_name(self):
682
"""Reading a Bytes record with one name returns a list of just that
685
self.assertRecordParsing(
686
([(b'name1', )], b'aaaaa'),
687
b"5\nname1\n\naaaaa")
689
def test_record_with_two_names(self):
690
"""Reading a Bytes record with two names returns a list of both names.
692
self.assertRecordParsing(
693
([(b'name1', ), (b'name2', )], b'aaaaa'),
694
b"5\nname1\nname2\n\naaaaa")
696
def test_record_with_two_part_names(self):
697
"""Reading a Bytes record with a two_part name reads both."""
698
self.assertRecordParsing(
699
([(b'name1', b'name2')], b'aaaaa'),
700
b"5\nname1\x00name2\n\naaaaa")
702
def test_invalid_length(self):
703
"""If the length-prefix is not a number, parsing raises
706
parser = self.make_parser_expecting_bytes_record()
708
errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
710
def test_incomplete_record(self):
711
"""If the bytes seen so far don't form a complete record, then there
712
will be nothing returned by read_pending_records.
714
parser = self.make_parser_expecting_bytes_record()
715
parser.accept_bytes(b"5\n\nabcd")
716
self.assertEqual([], parser.read_pending_records())
718
def test_accept_nothing(self):
719
"""The edge case of parsing an empty string causes no error."""
720
parser = self.make_parser_expecting_bytes_record()
721
parser.accept_bytes(b"")
723
def assertInvalidRecord(self, data):
724
"""Assert that parsing the given bytes raises InvalidRecordError."""
725
parser = self.make_parser_expecting_bytes_record()
727
errors.InvalidRecordError, parser.accept_bytes, data)
729
def test_read_invalid_name_whitespace(self):
730
"""Names must have no whitespace."""
731
# A name with a space.
732
self.assertInvalidRecord(b"0\nbad name\n\n")
735
self.assertInvalidRecord(b"0\nbad\tname\n\n")
737
# A name with a vertical tab.
738
self.assertInvalidRecord(b"0\nbad\vname\n\n")
740
def test_repeated_read_pending_records(self):
741
"""read_pending_records will not return the same record twice."""
742
parser = self.make_parser_expecting_bytes_record()
743
parser.accept_bytes(b"6\n\nabcdef")
744
self.assertEqual([([], b'abcdef')], parser.read_pending_records())
745
self.assertEqual([], parser.read_pending_records())