32
35
def test_begin(self):
33
36
serialiser = pack.ContainerSerialiser()
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
37
self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
35
38
serialiser.begin())
37
40
def test_end(self):
38
41
serialiser = pack.ContainerSerialiser()
39
self.assertEqual('E', serialiser.end())
42
self.assertEqual(b'E', serialiser.end())
41
44
def test_bytes_record_no_name(self):
42
45
serialiser = pack.ContainerSerialiser()
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
46
record = serialiser.bytes_record(b'bytes', [])
47
self.assertEqual(b'B5\n\nbytes', record)
46
49
def test_bytes_record_one_name_with_one_part(self):
47
50
serialiser = pack.ContainerSerialiser()
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
51
record = serialiser.bytes_record(b'bytes', [(b'name',)])
52
self.assertEqual(b'B5\nname\n\nbytes', record)
51
54
def test_bytes_record_one_name_with_two_parts(self):
52
55
serialiser = pack.ContainerSerialiser()
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
56
record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
57
self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
56
59
def test_bytes_record_two_names(self):
57
60
serialiser = pack.ContainerSerialiser()
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
61
record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
62
self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
61
64
def test_bytes_record_whitespace_in_name_part(self):
62
65
serialiser = pack.ContainerSerialiser()
64
67
errors.InvalidRecordError,
65
serialiser.bytes_record, 'bytes', [('bad name',)])
68
serialiser.bytes_record, b'bytes', [(b'bad name',)])
70
def test_bytes_record_header(self):
71
serialiser = pack.ContainerSerialiser()
72
record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
73
self.assertEqual(b'B32\nname1\nname2\n\n', record)
68
76
class TestContainerWriter(tests.TestCase):
71
tests.TestCase.setUp(self)
72
self.output = StringIO()
79
super(TestContainerWriter, self).setUp()
80
self.output = BytesIO()
73
81
self.writer = pack.ContainerWriter(self.output.write)
75
83
def assertOutput(self, expected_output):
111
119
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
112
120
"""The end() method does not count towards the records written."""
113
121
self.writer.begin()
114
self.writer.add_bytes_record('foo', names=[])
122
self.writer.add_bytes_record(b'foo', names=[])
115
123
self.writer.end()
116
124
self.assertEqual(1, self.writer.records_written)
118
126
def test_add_bytes_record_no_name(self):
119
127
"""Add a bytes record with no name."""
120
128
self.writer.begin()
121
offset, length = self.writer.add_bytes_record('abc', names=[])
129
offset, length = self.writer.add_bytes_record(b'abc', names=[])
122
130
self.assertEqual((42, 7), (offset, length))
123
131
self.assertOutput(
124
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
132
b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
126
134
def test_add_bytes_record_one_name(self):
127
135
"""Add a bytes record with one name."""
128
136
self.writer.begin()
129
138
offset, length = self.writer.add_bytes_record(
130
'abc', names=[('name1', )])
139
b'abc', names=[(b'name1', )])
131
140
self.assertEqual((42, 13), (offset, length))
132
141
self.assertOutput(
133
'Bazaar pack format 1 (introduced in 0.18)\n'
136
def test_add_bytes_record_two_names(self):
137
"""Add a bytes record with two names."""
139
offset, length = self.writer.add_bytes_record(
140
'abc', names=[('name1', ), ('name2', )])
141
self.assertEqual((42, 19), (offset, length))
143
'Bazaar pack format 1 (introduced in 0.18)\n'
144
'B3\nname1\nname2\n\nabc')
146
def test_add_bytes_record_two_names(self):
147
"""Add a bytes record with two names."""
149
offset, length = self.writer.add_bytes_record(
150
'abc', names=[('name1', ), ('name2', )])
151
self.assertEqual((42, 19), (offset, length))
153
'Bazaar pack format 1 (introduced in 0.18)\n'
154
'B3\nname1\nname2\n\nabc')
142
b'Bazaar pack format 1 (introduced in 0.18)\n'
145
def test_add_bytes_record_split_writes(self):
146
"""Write a large record which does multiple IOs"""
149
real_write = self.writer.write_func
151
def record_writes(data):
153
return real_write(data)
155
self.writer.write_func = record_writes
156
self.writer._JOIN_WRITES_THRESHOLD = 2
159
offset, length = self.writer.add_bytes_record(
160
b'abcabc', names=[(b'name1', )])
161
self.assertEqual((42, 16), (offset, length))
163
b'Bazaar pack format 1 (introduced in 0.18)\n'
164
b'B6\nname1\n\nabcabc')
167
b'Bazaar pack format 1 (introduced in 0.18)\n',
172
def test_add_bytes_record_two_names(self):
173
"""Add a bytes record with two names."""
175
offset, length = self.writer.add_bytes_record(
176
b'abc', names=[(b'name1', ), (b'name2', )])
177
self.assertEqual((42, 19), (offset, length))
179
b'Bazaar pack format 1 (introduced in 0.18)\n'
180
b'B3\nname1\nname2\n\nabc')
182
def test_add_bytes_record_two_names(self):
183
"""Add a bytes record with two names."""
185
offset, length = self.writer.add_bytes_record(
186
b'abc', names=[(b'name1', ), (b'name2', )])
187
self.assertEqual((42, 19), (offset, length))
189
b'Bazaar pack format 1 (introduced in 0.18)\n'
190
b'B3\nname1\nname2\n\nabc')
156
192
def test_add_bytes_record_two_element_name(self):
157
193
"""Add a bytes record with a two-element name."""
158
194
self.writer.begin()
159
195
offset, length = self.writer.add_bytes_record(
160
'abc', names=[('name1', 'name2')])
196
b'abc', names=[(b'name1', b'name2')])
161
197
self.assertEqual((42, 19), (offset, length))
162
198
self.assertOutput(
163
'Bazaar pack format 1 (introduced in 0.18)\n'
164
'B3\nname1\x00name2\n\nabc')
199
b'Bazaar pack format 1 (introduced in 0.18)\n'
200
b'B3\nname1\x00name2\n\nabc')
166
202
def test_add_second_bytes_record_gets_higher_offset(self):
167
203
self.writer.begin()
168
self.writer.add_bytes_record('abc', names=[])
169
offset, length = self.writer.add_bytes_record('abc', names=[])
204
self.writer.add_bytes_record(b'abc', names=[])
205
offset, length = self.writer.add_bytes_record(b'abc', names=[])
170
206
self.assertEqual((49, 7), (offset, length))
171
207
self.assertOutput(
172
'Bazaar pack format 1 (introduced in 0.18)\n'
208
b'Bazaar pack format 1 (introduced in 0.18)\n'
176
212
def test_add_bytes_record_invalid_name(self):
177
213
"""Adding a Bytes record with a name with whitespace in it raises
199
235
added, then separate tests for that format should be added.
202
def get_reader_for(self, bytes):
203
stream = StringIO(bytes)
238
def get_reader_for(self, data):
239
stream = BytesIO(data)
204
240
reader = pack.ContainerReader(stream)
207
243
def test_construct(self):
208
244
"""Test constructing a ContainerReader.
210
This uses None as the output stream to show that the constructor doesn't
211
try to use the input stream.
246
This uses None as the output stream to show that the constructor
247
doesn't try to use the input stream.
213
249
reader = pack.ContainerReader(None)
215
251
def test_empty_container(self):
216
252
"""Read an empty container."""
217
253
reader = self.get_reader_for(
218
"Bazaar pack format 1 (introduced in 0.18)\nE")
254
b"Bazaar pack format 1 (introduced in 0.18)\nE")
219
255
self.assertEqual([], list(reader.iter_records()))
221
257
def test_unknown_format(self):
222
258
"""Unrecognised container formats raise UnknownContainerFormatError."""
223
reader = self.get_reader_for("unknown format\n")
259
reader = self.get_reader_for(b"unknown format\n")
224
260
self.assertRaises(
225
261
errors.UnknownContainerFormatError, reader.iter_records)
347
385
"""Reading a Bytes record with no name returns an empty list of
350
reader = self.get_reader_for("5\n\naaaaa")
388
reader = self.get_reader_for(b"5\n\naaaaa")
351
389
names, get_bytes = reader.read()
352
390
self.assertEqual([], names)
353
self.assertEqual('aaaaa', get_bytes(None))
391
self.assertEqual(b'aaaaa', get_bytes(None))
355
393
def test_record_with_one_name(self):
356
394
"""Reading a Bytes record with one name returns a list of just that
359
reader = self.get_reader_for("5\nname1\n\naaaaa")
397
reader = self.get_reader_for(b"5\nname1\n\naaaaa")
360
398
names, get_bytes = reader.read()
361
self.assertEqual([('name1', )], names)
362
self.assertEqual('aaaaa', get_bytes(None))
399
self.assertEqual([(b'name1', )], names)
400
self.assertEqual(b'aaaaa', get_bytes(None))
364
402
def test_record_with_two_names(self):
365
403
"""Reading a Bytes record with two names returns a list of both names.
367
reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
405
reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
368
406
names, get_bytes = reader.read()
369
self.assertEqual([('name1', ), ('name2', )], names)
370
self.assertEqual('aaaaa', get_bytes(None))
407
self.assertEqual([(b'name1', ), (b'name2', )], names)
408
self.assertEqual(b'aaaaa', get_bytes(None))
372
410
def test_record_with_two_part_names(self):
373
411
"""Reading a Bytes record with a two_part name reads both."""
374
reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
412
reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
375
413
names, get_bytes = reader.read()
376
self.assertEqual([('name1', 'name2', )], names)
377
self.assertEqual('aaaaa', get_bytes(None))
414
self.assertEqual([(b'name1', b'name2', )], names)
415
self.assertEqual(b'aaaaa', get_bytes(None))
379
417
def test_invalid_length(self):
380
418
"""If the length-prefix is not a number, parsing raises
381
419
InvalidRecordError.
383
reader = self.get_reader_for("not a number\n")
421
reader = self.get_reader_for(b"not a number\n")
384
422
self.assertRaises(errors.InvalidRecordError, reader.read)
386
424
def test_early_eof(self):
412
450
def test_initial_eof(self):
413
451
"""EOF before any bytes read at all."""
414
reader = self.get_reader_for("")
452
reader = self.get_reader_for(b"")
415
453
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
417
455
def test_eof_after_length(self):
418
456
"""EOF after reading the length and before reading name(s)."""
419
reader = self.get_reader_for("123\n")
457
reader = self.get_reader_for(b"123\n")
420
458
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
422
460
def test_eof_during_name(self):
423
461
"""EOF during reading a name."""
424
reader = self.get_reader_for("123\nname")
462
reader = self.get_reader_for(b"123\nname")
425
463
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
427
465
def test_read_invalid_name_whitespace(self):
428
466
"""Names must have no whitespace."""
429
467
# A name with a space.
430
reader = self.get_reader_for("0\nbad name\n\n")
468
reader = self.get_reader_for(b"0\nbad name\n\n")
431
469
self.assertRaises(errors.InvalidRecordError, reader.read)
433
471
# A name with a tab.
434
reader = self.get_reader_for("0\nbad\tname\n\n")
472
reader = self.get_reader_for(b"0\nbad\tname\n\n")
435
473
self.assertRaises(errors.InvalidRecordError, reader.read)
437
475
# A name with a vertical tab.
438
reader = self.get_reader_for("0\nbad\vname\n\n")
476
reader = self.get_reader_for(b"0\nbad\vname\n\n")
439
477
self.assertRaises(errors.InvalidRecordError, reader.read)
441
479
def test_validate_whitespace_in_name(self):
442
480
"""Names must have no whitespace."""
443
reader = self.get_reader_for("0\nbad name\n\n")
481
reader = self.get_reader_for(b"0\nbad name\n\n")
444
482
self.assertRaises(errors.InvalidRecordError, reader.validate)
446
484
def test_validate_interrupted_prelude(self):
447
485
"""EOF during reading a record's prelude causes validate to fail."""
448
reader = self.get_reader_for("")
486
reader = self.get_reader_for(b"")
449
487
self.assertRaises(
450
488
errors.UnexpectedEndOfContainerError, reader.validate)
452
490
def test_validate_interrupted_body(self):
453
491
"""EOF during reading a record's body causes validate to fail."""
454
reader = self.get_reader_for("1\n\n")
492
reader = self.get_reader_for(b"1\n\n")
455
493
self.assertRaises(
456
494
errors.UnexpectedEndOfContainerError, reader.validate)
458
496
def test_validate_unparseable_length(self):
459
497
"""An unparseable record length causes validate to fail."""
460
reader = self.get_reader_for("\n\n")
498
reader = self.get_reader_for(b"\n\n")
461
499
self.assertRaises(
462
500
errors.InvalidRecordError, reader.validate)
464
502
def test_validate_undecodeable_name(self):
465
503
"""Names that aren't valid UTF-8 cause validate to fail."""
466
reader = self.get_reader_for("0\n\xcc\n\n")
504
reader = self.get_reader_for(b"0\n\xcc\n\n")
467
505
self.assertRaises(errors.InvalidRecordError, reader.validate)
469
507
def test_read_max_length(self):
470
508
"""If the max_length passed to the callable returned by read is not
471
509
None, then no more than that many bytes will be read.
473
reader = self.get_reader_for("6\n\nabcdef")
511
reader = self.get_reader_for(b"6\n\nabcdef")
474
512
names, get_bytes = reader.read()
475
self.assertEqual('abc', get_bytes(3))
513
self.assertEqual(b'abc', get_bytes(3))
477
515
def test_read_no_max_length(self):
478
516
"""If the max_length passed to the callable returned by read is None,
479
517
then all the bytes in the record will be read.
481
reader = self.get_reader_for("6\n\nabcdef")
519
reader = self.get_reader_for(b"6\n\nabcdef")
482
520
names, get_bytes = reader.read()
483
self.assertEqual('abcdef', get_bytes(None))
521
self.assertEqual(b'abcdef', get_bytes(None))
485
523
def test_repeated_read_calls(self):
486
524
"""Repeated calls to the callable returned from BytesRecordReader.read
487
525
will not read beyond the end of the record.
489
reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
527
reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
490
528
names, get_bytes = reader.read()
491
self.assertEqual('abcdef', get_bytes(None))
492
self.assertEqual('', get_bytes(None))
493
self.assertEqual('', get_bytes(99))
529
self.assertEqual(b'abcdef', get_bytes(None))
530
self.assertEqual(b'', get_bytes(None))
531
self.assertEqual(b'', get_bytes(99))
496
534
class TestMakeReadvReader(tests.TestCaseWithTransport):
498
536
def test_read_skipping_records(self):
499
pack_data = StringIO()
537
pack_data = BytesIO()
500
538
writer = pack.ContainerWriter(pack_data.write)
503
memos.append(writer.add_bytes_record('abc', names=[]))
504
memos.append(writer.add_bytes_record('def', names=[('name1', )]))
505
memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
506
memos.append(writer.add_bytes_record('jkl', names=[]))
541
memos.append(writer.add_bytes_record(b'abc', names=[]))
542
memos.append(writer.add_bytes_record(b'def', names=[(b'name1', )]))
543
memos.append(writer.add_bytes_record(b'ghi', names=[(b'name2', )]))
544
memos.append(writer.add_bytes_record(b'jkl', names=[]))
508
546
transport = self.get_transport()
509
transport.put_bytes('mypack', pack_data.getvalue())
547
transport.put_bytes(b'mypack', pack_data.getvalue())
510
548
requested_records = [memos[0], memos[2]]
511
reader = pack.make_readv_reader(transport, 'mypack', requested_records)
549
reader = pack.make_readv_reader(transport, b'mypack', requested_records)
513
551
for names, reader_func in reader.iter_records():
514
552
result.append((names, reader_func(None)))
515
self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
553
self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
518
556
class TestReadvFile(tests.TestCaseWithTransport):
526
564
def test_read_bytes(self):
527
565
"""Test reading of both single bytes and all bytes in a hunk."""
528
566
transport = self.get_transport()
529
transport.put_bytes('sample', '0123456789')
530
f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
567
transport.put_bytes(b'sample', b'0123456789')
568
f = pack.ReadVFile(transport.readv(b'sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
532
570
results.append(f.read(1))
533
571
results.append(f.read(2))
534
572
results.append(f.read(1))
535
573
results.append(f.read(1))
536
574
results.append(f.read(1))
537
self.assertEqual(['0', '12', '4', '6', '7'], results)
575
self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
539
577
def test_readline(self):
540
578
"""Test using readline() as ContainerReader does.
542
580
This is always within a readv hunk, never across it.
544
582
transport = self.get_transport()
545
transport.put_bytes('sample', '0\n2\n4\n')
546
f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
583
transport.put_bytes(b'sample', b'0\n2\n4\n')
584
f = pack.ReadVFile(transport.readv(b'sample', [(0, 2), (2, 4)]))
548
586
results.append(f.readline())
549
587
results.append(f.readline())
550
588
results.append(f.readline())
551
self.assertEqual(['0\n', '2\n', '4\n'], results)
589
self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
553
591
def test_readline_and_read(self):
554
592
"""Test exercising one byte reads, readline, and then read again."""
555
593
transport = self.get_transport()
556
transport.put_bytes('sample', '0\n2\n4\n')
557
f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
594
transport.put_bytes(b'sample', b'0\n2\n4\n')
595
f = pack.ReadVFile(transport.readv(b'sample', [(0, 6)]))
559
597
results.append(f.read(1))
560
598
results.append(f.readline())
561
599
results.append(f.read(4))
562
self.assertEqual(['0', '\n', '2\n4\n'], results)
600
self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
565
603
class PushParserTestCase(tests.TestCase):
636
674
"""Reading a Bytes record with no name returns an empty list of
639
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
677
self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
641
679
def test_record_with_one_name(self):
642
680
"""Reading a Bytes record with one name returns a list of just that
645
683
self.assertRecordParsing(
646
([('name1', )], 'aaaaa'),
684
([(b'name1', )], b'aaaaa'),
685
b"5\nname1\n\naaaaa")
649
687
def test_record_with_two_names(self):
650
688
"""Reading a Bytes record with two names returns a list of both names.
652
690
self.assertRecordParsing(
653
([('name1', ), ('name2', )], 'aaaaa'),
654
"5\nname1\nname2\n\naaaaa")
691
([(b'name1', ), (b'name2', )], b'aaaaa'),
692
b"5\nname1\nname2\n\naaaaa")
656
694
def test_record_with_two_part_names(self):
657
695
"""Reading a Bytes record with a two_part name reads both."""
658
696
self.assertRecordParsing(
659
([('name1', 'name2')], 'aaaaa'),
660
"5\nname1\x00name2\n\naaaaa")
697
([(b'name1', b'name2')], b'aaaaa'),
698
b"5\nname1\x00name2\n\naaaaa")
662
700
def test_invalid_length(self):
663
701
"""If the length-prefix is not a number, parsing raises
666
704
parser = self.make_parser_expecting_bytes_record()
667
705
self.assertRaises(
668
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
706
errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
670
708
def test_incomplete_record(self):
671
709
"""If the bytes seen so far don't form a complete record, then there
672
710
will be nothing returned by read_pending_records.
674
712
parser = self.make_parser_expecting_bytes_record()
675
parser.accept_bytes("5\n\nabcd")
713
parser.accept_bytes(b"5\n\nabcd")
676
714
self.assertEqual([], parser.read_pending_records())
678
716
def test_accept_nothing(self):
679
717
"""The edge case of parsing an empty string causes no error."""
680
718
parser = self.make_parser_expecting_bytes_record()
681
parser.accept_bytes("")
719
parser.accept_bytes(b"")
683
def assertInvalidRecord(self, bytes):
684
"""Assert that parsing the given bytes will raise an
721
def assertInvalidRecord(self, data):
722
"""Assert that parsing the given bytes raises InvalidRecordError."""
687
723
parser = self.make_parser_expecting_bytes_record()
688
724
self.assertRaises(
689
errors.InvalidRecordError, parser.accept_bytes, bytes)
725
errors.InvalidRecordError, parser.accept_bytes, data)
691
727
def test_read_invalid_name_whitespace(self):
692
728
"""Names must have no whitespace."""
693
729
# A name with a space.
694
self.assertInvalidRecord("0\nbad name\n\n")
730
self.assertInvalidRecord(b"0\nbad name\n\n")
696
732
# A name with a tab.
697
self.assertInvalidRecord("0\nbad\tname\n\n")
733
self.assertInvalidRecord(b"0\nbad\tname\n\n")
699
735
# A name with a vertical tab.
700
self.assertInvalidRecord("0\nbad\vname\n\n")
736
self.assertInvalidRecord(b"0\nbad\vname\n\n")
702
738
def test_repeated_read_pending_records(self):
703
739
"""read_pending_records will not return the same record twice."""
704
740
parser = self.make_parser_expecting_bytes_record()
705
parser.accept_bytes("6\n\nabcdef")
706
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
741
parser.accept_bytes(b"6\n\nabcdef")
742
self.assertEqual([([], b'abcdef')], parser.read_pending_records())
707
743
self.assertEqual([], parser.read_pending_records())