35
32
def test_begin(self):
36
33
serialiser = pack.ContainerSerialiser()
37
self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
35
serialiser.begin())
40
37
def test_end(self):
41
38
serialiser = pack.ContainerSerialiser()
42
self.assertEqual(b'E', serialiser.end())
39
self.assertEqual('E', serialiser.end())
44
41
def test_bytes_record_no_name(self):
45
42
serialiser = pack.ContainerSerialiser()
46
record = serialiser.bytes_record(b'bytes', [])
47
self.assertEqual(b'B5\n\nbytes', record)
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
49
46
def test_bytes_record_one_name_with_one_part(self):
50
47
serialiser = pack.ContainerSerialiser()
51
record = serialiser.bytes_record(b'bytes', [(b'name',)])
52
self.assertEqual(b'B5\nname\n\nbytes', record)
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
54
51
def test_bytes_record_one_name_with_two_parts(self):
55
52
serialiser = pack.ContainerSerialiser()
56
record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
57
self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
59
56
def test_bytes_record_two_names(self):
60
57
serialiser = pack.ContainerSerialiser()
61
record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
62
self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
64
61
def test_bytes_record_whitespace_in_name_part(self):
65
62
serialiser = pack.ContainerSerialiser()
67
64
errors.InvalidRecordError,
68
serialiser.bytes_record, b'bytes', [(b'bad name',)])
65
serialiser.bytes_record, 'bytes', [('bad name',)])
70
67
def test_bytes_record_header(self):
71
68
serialiser = pack.ContainerSerialiser()
72
record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
73
self.assertEqual(b'B32\nname1\nname2\n\n', record)
69
record = serialiser.bytes_header(32, [('name1',), ('name2',)])
70
self.assertEqual('B32\nname1\nname2\n\n', record)
76
73
class TestContainerWriter(tests.TestCase):
79
76
super(TestContainerWriter, self).setUp()
80
self.output = BytesIO()
77
self.output = StringIO()
81
78
self.writer = pack.ContainerWriter(self.output.write)
83
80
def assertOutput(self, expected_output):
119
116
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
120
117
"""The end() method does not count towards the records written."""
121
118
self.writer.begin()
122
self.writer.add_bytes_record(b'foo', names=[])
119
self.writer.add_bytes_record('foo', names=[])
123
120
self.writer.end()
124
121
self.assertEqual(1, self.writer.records_written)
126
123
def test_add_bytes_record_no_name(self):
127
124
"""Add a bytes record with no name."""
128
125
self.writer.begin()
129
offset, length = self.writer.add_bytes_record(b'abc', names=[])
126
offset, length = self.writer.add_bytes_record('abc', names=[])
130
127
self.assertEqual((42, 7), (offset, length))
131
128
self.assertOutput(
132
b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
129
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
134
131
def test_add_bytes_record_one_name(self):
135
132
"""Add a bytes record with one name."""
136
133
self.writer.begin()
138
135
offset, length = self.writer.add_bytes_record(
139
b'abc', names=[(b'name1', )])
136
'abc', names=[('name1', )])
140
137
self.assertEqual((42, 13), (offset, length))
141
138
self.assertOutput(
142
b'Bazaar pack format 1 (introduced in 0.18)\n'
139
'Bazaar pack format 1 (introduced in 0.18)\n'
145
142
def test_add_bytes_record_split_writes(self):
146
143
"""Write a large record which does multiple IOs"""
149
146
real_write = self.writer.write_func
151
def record_writes(data):
153
return real_write(data)
148
def record_writes(bytes):
150
return real_write(bytes)
155
152
self.writer.write_func = record_writes
156
153
self.writer._JOIN_WRITES_THRESHOLD = 2
158
155
self.writer.begin()
159
156
offset, length = self.writer.add_bytes_record(
160
b'abcabc', names=[(b'name1', )])
157
'abcabc', names=[('name1', )])
161
158
self.assertEqual((42, 16), (offset, length))
162
159
self.assertOutput(
163
b'Bazaar pack format 1 (introduced in 0.18)\n'
164
b'B6\nname1\n\nabcabc')
160
'Bazaar pack format 1 (introduced in 0.18)\n'
161
'B6\nname1\n\nabcabc')
167
b'Bazaar pack format 1 (introduced in 0.18)\n',
164
'Bazaar pack format 1 (introduced in 0.18)\n',
172
169
def test_add_bytes_record_two_names(self):
173
170
"""Add a bytes record with two names."""
174
171
self.writer.begin()
175
172
offset, length = self.writer.add_bytes_record(
176
b'abc', names=[(b'name1', ), (b'name2', )])
173
'abc', names=[('name1', ), ('name2', )])
177
174
self.assertEqual((42, 19), (offset, length))
178
175
self.assertOutput(
179
b'Bazaar pack format 1 (introduced in 0.18)\n'
180
b'B3\nname1\nname2\n\nabc')
176
'Bazaar pack format 1 (introduced in 0.18)\n'
177
'B3\nname1\nname2\n\nabc')
182
179
def test_add_bytes_record_two_names(self):
183
180
"""Add a bytes record with two names."""
184
181
self.writer.begin()
185
182
offset, length = self.writer.add_bytes_record(
186
b'abc', names=[(b'name1', ), (b'name2', )])
183
'abc', names=[('name1', ), ('name2', )])
187
184
self.assertEqual((42, 19), (offset, length))
188
185
self.assertOutput(
189
b'Bazaar pack format 1 (introduced in 0.18)\n'
190
b'B3\nname1\nname2\n\nabc')
186
'Bazaar pack format 1 (introduced in 0.18)\n'
187
'B3\nname1\nname2\n\nabc')
192
189
def test_add_bytes_record_two_element_name(self):
193
190
"""Add a bytes record with a two-element name."""
194
191
self.writer.begin()
195
192
offset, length = self.writer.add_bytes_record(
196
b'abc', names=[(b'name1', b'name2')])
193
'abc', names=[('name1', 'name2')])
197
194
self.assertEqual((42, 19), (offset, length))
198
195
self.assertOutput(
199
b'Bazaar pack format 1 (introduced in 0.18)\n'
200
b'B3\nname1\x00name2\n\nabc')
196
'Bazaar pack format 1 (introduced in 0.18)\n'
197
'B3\nname1\x00name2\n\nabc')
202
199
def test_add_second_bytes_record_gets_higher_offset(self):
203
200
self.writer.begin()
204
self.writer.add_bytes_record(b'abc', names=[])
205
offset, length = self.writer.add_bytes_record(b'abc', names=[])
201
self.writer.add_bytes_record('abc', names=[])
202
offset, length = self.writer.add_bytes_record('abc', names=[])
206
203
self.assertEqual((49, 7), (offset, length))
207
204
self.assertOutput(
208
b'Bazaar pack format 1 (introduced in 0.18)\n'
205
'Bazaar pack format 1 (introduced in 0.18)\n'
212
209
def test_add_bytes_record_invalid_name(self):
213
210
"""Adding a Bytes record with a name with whitespace in it raises
385
381
"""Reading a Bytes record with no name returns an empty list of
388
reader = self.get_reader_for(b"5\n\naaaaa")
384
reader = self.get_reader_for("5\n\naaaaa")
389
385
names, get_bytes = reader.read()
390
386
self.assertEqual([], names)
391
self.assertEqual(b'aaaaa', get_bytes(None))
387
self.assertEqual('aaaaa', get_bytes(None))
393
389
def test_record_with_one_name(self):
394
390
"""Reading a Bytes record with one name returns a list of just that
397
reader = self.get_reader_for(b"5\nname1\n\naaaaa")
393
reader = self.get_reader_for("5\nname1\n\naaaaa")
398
394
names, get_bytes = reader.read()
399
self.assertEqual([(b'name1', )], names)
400
self.assertEqual(b'aaaaa', get_bytes(None))
395
self.assertEqual([('name1', )], names)
396
self.assertEqual('aaaaa', get_bytes(None))
402
398
def test_record_with_two_names(self):
403
399
"""Reading a Bytes record with two names returns a list of both names.
405
reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
401
reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
406
402
names, get_bytes = reader.read()
407
self.assertEqual([(b'name1', ), (b'name2', )], names)
408
self.assertEqual(b'aaaaa', get_bytes(None))
403
self.assertEqual([('name1', ), ('name2', )], names)
404
self.assertEqual('aaaaa', get_bytes(None))
410
406
def test_record_with_two_part_names(self):
411
407
"""Reading a Bytes record with a two_part name reads both."""
412
reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
408
reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
413
409
names, get_bytes = reader.read()
414
self.assertEqual([(b'name1', b'name2', )], names)
415
self.assertEqual(b'aaaaa', get_bytes(None))
410
self.assertEqual([('name1', 'name2', )], names)
411
self.assertEqual('aaaaa', get_bytes(None))
417
413
def test_invalid_length(self):
418
414
"""If the length-prefix is not a number, parsing raises
419
415
InvalidRecordError.
421
reader = self.get_reader_for(b"not a number\n")
417
reader = self.get_reader_for("not a number\n")
422
418
self.assertRaises(errors.InvalidRecordError, reader.read)
424
420
def test_early_eof(self):
450
446
def test_initial_eof(self):
451
447
"""EOF before any bytes read at all."""
452
reader = self.get_reader_for(b"")
448
reader = self.get_reader_for("")
453
449
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
455
451
def test_eof_after_length(self):
456
452
"""EOF after reading the length and before reading name(s)."""
457
reader = self.get_reader_for(b"123\n")
453
reader = self.get_reader_for("123\n")
458
454
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
460
456
def test_eof_during_name(self):
461
457
"""EOF during reading a name."""
462
reader = self.get_reader_for(b"123\nname")
458
reader = self.get_reader_for("123\nname")
463
459
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
465
461
def test_read_invalid_name_whitespace(self):
466
462
"""Names must have no whitespace."""
467
463
# A name with a space.
468
reader = self.get_reader_for(b"0\nbad name\n\n")
464
reader = self.get_reader_for("0\nbad name\n\n")
469
465
self.assertRaises(errors.InvalidRecordError, reader.read)
471
467
# A name with a tab.
472
reader = self.get_reader_for(b"0\nbad\tname\n\n")
468
reader = self.get_reader_for("0\nbad\tname\n\n")
473
469
self.assertRaises(errors.InvalidRecordError, reader.read)
475
471
# A name with a vertical tab.
476
reader = self.get_reader_for(b"0\nbad\vname\n\n")
472
reader = self.get_reader_for("0\nbad\vname\n\n")
477
473
self.assertRaises(errors.InvalidRecordError, reader.read)
479
475
def test_validate_whitespace_in_name(self):
480
476
"""Names must have no whitespace."""
481
reader = self.get_reader_for(b"0\nbad name\n\n")
477
reader = self.get_reader_for("0\nbad name\n\n")
482
478
self.assertRaises(errors.InvalidRecordError, reader.validate)
484
480
def test_validate_interrupted_prelude(self):
485
481
"""EOF during reading a record's prelude causes validate to fail."""
486
reader = self.get_reader_for(b"")
482
reader = self.get_reader_for("")
487
483
self.assertRaises(
488
484
errors.UnexpectedEndOfContainerError, reader.validate)
490
486
def test_validate_interrupted_body(self):
491
487
"""EOF during reading a record's body causes validate to fail."""
492
reader = self.get_reader_for(b"1\n\n")
488
reader = self.get_reader_for("1\n\n")
493
489
self.assertRaises(
494
490
errors.UnexpectedEndOfContainerError, reader.validate)
496
492
def test_validate_unparseable_length(self):
497
493
"""An unparseable record length causes validate to fail."""
498
reader = self.get_reader_for(b"\n\n")
494
reader = self.get_reader_for("\n\n")
499
495
self.assertRaises(
500
496
errors.InvalidRecordError, reader.validate)
502
498
def test_validate_undecodeable_name(self):
503
499
"""Names that aren't valid UTF-8 cause validate to fail."""
504
reader = self.get_reader_for(b"0\n\xcc\n\n")
500
reader = self.get_reader_for("0\n\xcc\n\n")
505
501
self.assertRaises(errors.InvalidRecordError, reader.validate)
507
503
def test_read_max_length(self):
508
504
"""If the max_length passed to the callable returned by read is not
509
505
None, then no more than that many bytes will be read.
511
reader = self.get_reader_for(b"6\n\nabcdef")
507
reader = self.get_reader_for("6\n\nabcdef")
512
508
names, get_bytes = reader.read()
513
self.assertEqual(b'abc', get_bytes(3))
509
self.assertEqual('abc', get_bytes(3))
515
511
def test_read_no_max_length(self):
516
512
"""If the max_length passed to the callable returned by read is None,
517
513
then all the bytes in the record will be read.
519
reader = self.get_reader_for(b"6\n\nabcdef")
515
reader = self.get_reader_for("6\n\nabcdef")
520
516
names, get_bytes = reader.read()
521
self.assertEqual(b'abcdef', get_bytes(None))
517
self.assertEqual('abcdef', get_bytes(None))
523
519
def test_repeated_read_calls(self):
524
520
"""Repeated calls to the callable returned from BytesRecordReader.read
525
521
will not read beyond the end of the record.
527
reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
523
reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
528
524
names, get_bytes = reader.read()
529
self.assertEqual(b'abcdef', get_bytes(None))
530
self.assertEqual(b'', get_bytes(None))
531
self.assertEqual(b'', get_bytes(99))
525
self.assertEqual('abcdef', get_bytes(None))
526
self.assertEqual('', get_bytes(None))
527
self.assertEqual('', get_bytes(99))
534
530
class TestMakeReadvReader(tests.TestCaseWithTransport):
536
532
def test_read_skipping_records(self):
537
pack_data = BytesIO()
533
pack_data = StringIO()
538
534
writer = pack.ContainerWriter(pack_data.write)
541
memos.append(writer.add_bytes_record(b'abc', names=[]))
542
memos.append(writer.add_bytes_record(b'def', names=[(b'name1', )]))
543
memos.append(writer.add_bytes_record(b'ghi', names=[(b'name2', )]))
544
memos.append(writer.add_bytes_record(b'jkl', names=[]))
537
memos.append(writer.add_bytes_record('abc', names=[]))
538
memos.append(writer.add_bytes_record('def', names=[('name1', )]))
539
memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
540
memos.append(writer.add_bytes_record('jkl', names=[]))
546
542
transport = self.get_transport()
547
transport.put_bytes(b'mypack', pack_data.getvalue())
543
transport.put_bytes('mypack', pack_data.getvalue())
548
544
requested_records = [memos[0], memos[2]]
549
reader = pack.make_readv_reader(transport, b'mypack', requested_records)
545
reader = pack.make_readv_reader(transport, 'mypack', requested_records)
551
547
for names, reader_func in reader.iter_records():
552
548
result.append((names, reader_func(None)))
553
self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
549
self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
556
552
class TestReadvFile(tests.TestCaseWithTransport):
564
560
def test_read_bytes(self):
565
561
"""Test reading of both single bytes and all bytes in a hunk."""
566
562
transport = self.get_transport()
567
transport.put_bytes(b'sample', b'0123456789')
568
f = pack.ReadVFile(transport.readv(b'sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
563
transport.put_bytes('sample', '0123456789')
564
f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
570
566
results.append(f.read(1))
571
567
results.append(f.read(2))
572
568
results.append(f.read(1))
573
569
results.append(f.read(1))
574
570
results.append(f.read(1))
575
self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
571
self.assertEqual(['0', '12', '4', '6', '7'], results)
577
573
def test_readline(self):
578
574
"""Test using readline() as ContainerReader does.
580
576
This is always within a readv hunk, never across it.
582
578
transport = self.get_transport()
583
transport.put_bytes(b'sample', b'0\n2\n4\n')
584
f = pack.ReadVFile(transport.readv(b'sample', [(0, 2), (2, 4)]))
579
transport.put_bytes('sample', '0\n2\n4\n')
580
f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
586
582
results.append(f.readline())
587
583
results.append(f.readline())
588
584
results.append(f.readline())
589
self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
585
self.assertEqual(['0\n', '2\n', '4\n'], results)
591
587
def test_readline_and_read(self):
592
588
"""Test exercising one byte reads, readline, and then read again."""
593
589
transport = self.get_transport()
594
transport.put_bytes(b'sample', b'0\n2\n4\n')
595
f = pack.ReadVFile(transport.readv(b'sample', [(0, 6)]))
590
transport.put_bytes('sample', '0\n2\n4\n')
591
f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
597
593
results.append(f.read(1))
598
594
results.append(f.readline())
599
595
results.append(f.read(4))
600
self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
596
self.assertEqual(['0', '\n', '2\n4\n'], results)
603
599
class PushParserTestCase(tests.TestCase):
674
670
"""Reading a Bytes record with no name returns an empty list of
677
self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
673
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
679
675
def test_record_with_one_name(self):
680
676
"""Reading a Bytes record with one name returns a list of just that
683
679
self.assertRecordParsing(
684
([(b'name1', )], b'aaaaa'),
685
b"5\nname1\n\naaaaa")
680
([('name1', )], 'aaaaa'),
687
683
def test_record_with_two_names(self):
688
684
"""Reading a Bytes record with two names returns a list of both names.
690
686
self.assertRecordParsing(
691
([(b'name1', ), (b'name2', )], b'aaaaa'),
692
b"5\nname1\nname2\n\naaaaa")
687
([('name1', ), ('name2', )], 'aaaaa'),
688
"5\nname1\nname2\n\naaaaa")
694
690
def test_record_with_two_part_names(self):
695
691
"""Reading a Bytes record with a two_part name reads both."""
696
692
self.assertRecordParsing(
697
([(b'name1', b'name2')], b'aaaaa'),
698
b"5\nname1\x00name2\n\naaaaa")
693
([('name1', 'name2')], 'aaaaa'),
694
"5\nname1\x00name2\n\naaaaa")
700
696
def test_invalid_length(self):
701
697
"""If the length-prefix is not a number, parsing raises
704
700
parser = self.make_parser_expecting_bytes_record()
705
701
self.assertRaises(
706
errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
702
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
708
704
def test_incomplete_record(self):
709
705
"""If the bytes seen so far don't form a complete record, then there
710
706
will be nothing returned by read_pending_records.
712
708
parser = self.make_parser_expecting_bytes_record()
713
parser.accept_bytes(b"5\n\nabcd")
709
parser.accept_bytes("5\n\nabcd")
714
710
self.assertEqual([], parser.read_pending_records())
716
712
def test_accept_nothing(self):
717
713
"""The edge case of parsing an empty string causes no error."""
718
714
parser = self.make_parser_expecting_bytes_record()
719
parser.accept_bytes(b"")
715
parser.accept_bytes("")
721
def assertInvalidRecord(self, data):
722
"""Assert that parsing the given bytes raises InvalidRecordError."""
717
def assertInvalidRecord(self, bytes):
718
"""Assert that parsing the given bytes will raise an
723
721
parser = self.make_parser_expecting_bytes_record()
724
722
self.assertRaises(
725
errors.InvalidRecordError, parser.accept_bytes, data)
723
errors.InvalidRecordError, parser.accept_bytes, bytes)
727
725
def test_read_invalid_name_whitespace(self):
728
726
"""Names must have no whitespace."""
729
727
# A name with a space.
730
self.assertInvalidRecord(b"0\nbad name\n\n")
728
self.assertInvalidRecord("0\nbad name\n\n")
732
730
# A name with a tab.
733
self.assertInvalidRecord(b"0\nbad\tname\n\n")
731
self.assertInvalidRecord("0\nbad\tname\n\n")
735
733
# A name with a vertical tab.
736
self.assertInvalidRecord(b"0\nbad\vname\n\n")
734
self.assertInvalidRecord("0\nbad\vname\n\n")
738
736
def test_repeated_read_pending_records(self):
739
737
"""read_pending_records will not return the same record twice."""
740
738
parser = self.make_parser_expecting_bytes_record()
741
parser.accept_bytes(b"6\n\nabcdef")
742
self.assertEqual([([], b'abcdef')], parser.read_pending_records())
739
parser.accept_bytes("6\n\nabcdef")
740
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
743
741
self.assertEqual([], parser.read_pending_records())