/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_pack.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Tests for breezy.pack."""
18
18
 
19
 
from io import BytesIO
20
 
 
21
 
from ... import errors, tests
22
 
from .. import (
 
19
from .. import errors, tests
 
20
from ..bzr import (
23
21
    pack,
24
22
    )
 
23
from ..sixish import (
 
24
    BytesIO,
 
25
    )
25
26
 
26
27
 
27
28
class TestContainerSerialiser(tests.TestCase):
33
34
 
34
35
    def test_begin(self):
35
36
        serialiser = pack.ContainerSerialiser()
36
 
        self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
 
37
        self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
37
38
                         serialiser.begin())
38
39
 
39
40
    def test_end(self):
40
41
        serialiser = pack.ContainerSerialiser()
41
 
        self.assertEqual(b'E', serialiser.end())
 
42
        self.assertEqual('E', serialiser.end())
42
43
 
43
44
    def test_bytes_record_no_name(self):
44
45
        serialiser = pack.ContainerSerialiser()
45
 
        record = serialiser.bytes_record(b'bytes', [])
46
 
        self.assertEqual(b'B5\n\nbytes', record)
 
46
        record = serialiser.bytes_record('bytes', [])
 
47
        self.assertEqual('B5\n\nbytes', record)
47
48
 
48
49
    def test_bytes_record_one_name_with_one_part(self):
49
50
        serialiser = pack.ContainerSerialiser()
50
 
        record = serialiser.bytes_record(b'bytes', [(b'name',)])
51
 
        self.assertEqual(b'B5\nname\n\nbytes', record)
 
51
        record = serialiser.bytes_record('bytes', [('name',)])
 
52
        self.assertEqual('B5\nname\n\nbytes', record)
52
53
 
53
54
    def test_bytes_record_one_name_with_two_parts(self):
54
55
        serialiser = pack.ContainerSerialiser()
55
 
        record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
56
 
        self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
 
56
        record = serialiser.bytes_record('bytes', [('part1', 'part2')])
 
57
        self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
57
58
 
58
59
    def test_bytes_record_two_names(self):
59
60
        serialiser = pack.ContainerSerialiser()
60
 
        record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
61
 
        self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
 
61
        record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
 
62
        self.assertEqual('B5\nname1\nname2\n\nbytes', record)
62
63
 
63
64
    def test_bytes_record_whitespace_in_name_part(self):
64
65
        serialiser = pack.ContainerSerialiser()
65
66
        self.assertRaises(
66
67
            errors.InvalidRecordError,
67
 
            serialiser.bytes_record, b'bytes', [(b'bad name',)])
 
68
            serialiser.bytes_record, 'bytes', [('bad name',)])
68
69
 
69
70
    def test_bytes_record_header(self):
70
71
        serialiser = pack.ContainerSerialiser()
71
 
        record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
72
 
        self.assertEqual(b'B32\nname1\nname2\n\n', record)
 
72
        record = serialiser.bytes_header(32, [('name1',), ('name2',)])
 
73
        self.assertEqual('B32\nname1\nname2\n\n', record)
73
74
 
74
75
 
75
76
class TestContainerWriter(tests.TestCase):
91
92
        This uses None as the output stream to show that the constructor
92
93
        doesn't try to use the output stream.
93
94
        """
94
 
        pack.ContainerWriter(None)
 
95
        writer = pack.ContainerWriter(None)
95
96
 
96
97
    def test_begin(self):
97
98
        """The begin() method writes the container format marker line."""
98
99
        self.writer.begin()
99
 
        self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\n')
 
100
        self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
100
101
 
101
102
    def test_zero_records_written_after_begin(self):
102
103
        """After begin is written, 0 records have been written."""
107
108
        """The end() method writes an End Marker record."""
108
109
        self.writer.begin()
109
110
        self.writer.end()
110
 
        self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\nE')
 
111
        self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
111
112
 
112
113
    def test_empty_end_does_not_add_a_record_to_records_written(self):
113
114
        """The end() method does not count towards the records written."""
118
119
    def test_non_empty_end_does_not_add_a_record_to_records_written(self):
119
120
        """The end() method does not count towards the records written."""
120
121
        self.writer.begin()
121
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
 
122
        self.writer.add_bytes_record('foo', names=[])
122
123
        self.writer.end()
123
124
        self.assertEqual(1, self.writer.records_written)
124
125
 
125
126
    def test_add_bytes_record_no_name(self):
126
127
        """Add a bytes record with no name."""
127
128
        self.writer.begin()
128
 
        offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
 
129
        offset, length = self.writer.add_bytes_record('abc', names=[])
129
130
        self.assertEqual((42, 7), (offset, length))
130
131
        self.assertOutput(
131
 
            b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
 
132
            'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
132
133
 
133
134
    def test_add_bytes_record_one_name(self):
134
135
        """Add a bytes record with one name."""
135
136
        self.writer.begin()
136
137
 
137
138
        offset, length = self.writer.add_bytes_record(
138
 
            [b'abc'], len(b'abc'), names=[(b'name1', )])
 
139
            'abc', names=[('name1', )])
139
140
        self.assertEqual((42, 13), (offset, length))
140
141
        self.assertOutput(
141
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
142
 
            b'B3\nname1\n\nabc')
 
142
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
143
            'B3\nname1\n\nabc')
143
144
 
144
145
    def test_add_bytes_record_split_writes(self):
145
146
        """Write a large record which does multiple IOs"""
147
148
        writes = []
148
149
        real_write = self.writer.write_func
149
150
 
150
 
        def record_writes(data):
151
 
            writes.append(data)
152
 
            return real_write(data)
 
151
        def record_writes(bytes):
 
152
            writes.append(bytes)
 
153
            return real_write(bytes)
153
154
 
154
155
        self.writer.write_func = record_writes
155
156
        self.writer._JOIN_WRITES_THRESHOLD = 2
156
157
 
157
158
        self.writer.begin()
158
159
        offset, length = self.writer.add_bytes_record(
159
 
            [b'abcabc'], len(b'abcabc'), names=[(b'name1', )])
 
160
            'abcabc', names=[('name1', )])
160
161
        self.assertEqual((42, 16), (offset, length))
161
162
        self.assertOutput(
162
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
163
 
            b'B6\nname1\n\nabcabc')
 
163
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
164
            'B6\nname1\n\nabcabc')
164
165
 
165
166
        self.assertEqual([
166
 
            b'Bazaar pack format 1 (introduced in 0.18)\n',
167
 
            b'B6\nname1\n\n',
168
 
            b'abcabc'],
 
167
            'Bazaar pack format 1 (introduced in 0.18)\n',
 
168
            'B6\nname1\n\n',
 
169
            'abcabc'],
169
170
            writes)
170
171
 
171
172
    def test_add_bytes_record_two_names(self):
172
173
        """Add a bytes record with two names."""
173
174
        self.writer.begin()
174
175
        offset, length = self.writer.add_bytes_record(
175
 
            [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
 
176
            'abc', names=[('name1', ), ('name2', )])
176
177
        self.assertEqual((42, 19), (offset, length))
177
178
        self.assertOutput(
178
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
179
 
            b'B3\nname1\nname2\n\nabc')
 
179
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
180
            'B3\nname1\nname2\n\nabc')
180
181
 
181
182
    def test_add_bytes_record_two_names(self):
182
183
        """Add a bytes record with two names."""
183
184
        self.writer.begin()
184
185
        offset, length = self.writer.add_bytes_record(
185
 
            [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
 
186
            'abc', names=[('name1', ), ('name2', )])
186
187
        self.assertEqual((42, 19), (offset, length))
187
188
        self.assertOutput(
188
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
189
 
            b'B3\nname1\nname2\n\nabc')
 
189
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
190
            'B3\nname1\nname2\n\nabc')
190
191
 
191
192
    def test_add_bytes_record_two_element_name(self):
192
193
        """Add a bytes record with a two-element name."""
193
194
        self.writer.begin()
194
195
        offset, length = self.writer.add_bytes_record(
195
 
            [b'abc'], len(b'abc'), names=[(b'name1', b'name2')])
 
196
            'abc', names=[('name1', 'name2')])
196
197
        self.assertEqual((42, 19), (offset, length))
197
198
        self.assertOutput(
198
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
199
 
            b'B3\nname1\x00name2\n\nabc')
 
199
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
200
            'B3\nname1\x00name2\n\nabc')
200
201
 
201
202
    def test_add_second_bytes_record_gets_higher_offset(self):
202
203
        self.writer.begin()
203
 
        self.writer.add_bytes_record([b'a', b'bc'], len(b'abc'), names=[])
204
 
        offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
 
204
        self.writer.add_bytes_record('abc', names=[])
 
205
        offset, length = self.writer.add_bytes_record('abc', names=[])
205
206
        self.assertEqual((49, 7), (offset, length))
206
207
        self.assertOutput(
207
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
208
 
            b'B3\n\nabc'
209
 
            b'B3\n\nabc')
 
208
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
209
            'B3\n\nabc'
 
210
            'B3\n\nabc')
210
211
 
211
212
    def test_add_bytes_record_invalid_name(self):
212
213
        """Adding a Bytes record with a name with whitespace in it raises
215
216
        self.writer.begin()
216
217
        self.assertRaises(
217
218
            errors.InvalidRecordError,
218
 
            self.writer.add_bytes_record, [b'abc'], len(b'abc'), names=[(b'bad name', )])
 
219
            self.writer.add_bytes_record, 'abc', names=[('bad name', )])
219
220
 
220
221
    def test_add_bytes_records_add_to_records_written(self):
221
222
        """Adding a Bytes record increments the records_written counter."""
222
223
        self.writer.begin()
223
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
 
224
        self.writer.add_bytes_record('foo', names=[])
224
225
        self.assertEqual(1, self.writer.records_written)
225
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
 
226
        self.writer.add_bytes_record('foo', names=[])
226
227
        self.assertEqual(2, self.writer.records_written)
227
228
 
228
229
 
234
235
    added, then separate tests for that format should be added.
235
236
    """
236
237
 
237
 
    def get_reader_for(self, data):
238
 
        stream = BytesIO(data)
 
238
    def get_reader_for(self, bytes):
 
239
        stream = BytesIO(bytes)
239
240
        reader = pack.ContainerReader(stream)
240
241
        return reader
241
242
 
245
246
        This uses None as the output stream to show that the constructor
246
247
        doesn't try to use the input stream.
247
248
        """
248
 
        pack.ContainerReader(None)
 
249
        reader = pack.ContainerReader(None)
249
250
 
250
251
    def test_empty_container(self):
251
252
        """Read an empty container."""
252
253
        reader = self.get_reader_for(
253
 
            b"Bazaar pack format 1 (introduced in 0.18)\nE")
 
254
            "Bazaar pack format 1 (introduced in 0.18)\nE")
254
255
        self.assertEqual([], list(reader.iter_records()))
255
256
 
256
257
    def test_unknown_format(self):
257
258
        """Unrecognised container formats raise UnknownContainerFormatError."""
258
 
        reader = self.get_reader_for(b"unknown format\n")
 
259
        reader = self.get_reader_for("unknown format\n")
259
260
        self.assertRaises(
260
261
            errors.UnknownContainerFormatError, reader.iter_records)
261
262
 
264
265
        UnexpectedEndOfContainerError to be raised.
265
266
        """
266
267
        reader = self.get_reader_for(
267
 
            b"Bazaar pack format 1 (introduced in 0.18)\n")
 
268
            "Bazaar pack format 1 (introduced in 0.18)\n")
268
269
        iterator = reader.iter_records()
269
270
        self.assertRaises(
270
271
            errors.UnexpectedEndOfContainerError, next, iterator)
272
273
    def test_unknown_record_type(self):
273
274
        """Unknown record types cause UnknownRecordTypeError to be raised."""
274
275
        reader = self.get_reader_for(
275
 
            b"Bazaar pack format 1 (introduced in 0.18)\nX")
 
276
            "Bazaar pack format 1 (introduced in 0.18)\nX")
276
277
        iterator = reader.iter_records()
277
278
        self.assertRaises(
278
279
            errors.UnknownRecordTypeError, next, iterator)
285
286
        ContainerReader's integration with BytesRecordReader is working.
286
287
        """
287
288
        reader = self.get_reader_for(
288
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
289
 
        expected_records = [([], b'aaaaa')]
 
289
            "Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
 
290
        expected_records = [([], 'aaaaa')]
290
291
        self.assertEqual(
291
292
            expected_records,
292
293
            [(names, read_bytes(None))
295
296
    def test_validate_empty_container(self):
296
297
        """validate does not raise an error for a container with no records."""
297
298
        reader = self.get_reader_for(
298
 
            b"Bazaar pack format 1 (introduced in 0.18)\nE")
 
299
            "Bazaar pack format 1 (introduced in 0.18)\nE")
299
300
        # No exception raised
300
301
        reader.validate()
301
302
 
303
304
        """validate does not raise an error for a container with a valid record.
304
305
        """
305
306
        reader = self.get_reader_for(
306
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
 
307
            "Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
307
308
        # No exception raised
308
309
        reader.validate()
309
310
 
313
314
        It may raise either UnexpectedEndOfContainerError or
314
315
        UnknownContainerFormatError, depending on exactly what the string is.
315
316
        """
316
 
        inputs = [
317
 
            b"", b"x", b"Bazaar pack format 1 (introduced in 0.18)", b"bad\n"]
 
317
        inputs = ["", "x", "Bazaar pack format 1 (introduced in 0.18)", "bad\n"]
318
318
        for input in inputs:
319
319
            reader = self.get_reader_for(input)
320
320
            self.assertRaises(
327
327
        types.
328
328
        """
329
329
        reader = self.get_reader_for(
330
 
            b"Bazaar pack format 1 (introduced in 0.18)\nX")
 
330
            "Bazaar pack format 1 (introduced in 0.18)\nX")
331
331
        self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
332
332
 
333
333
    def test_validate_data_after_end_marker(self):
335
335
        after the end of the container.
336
336
        """
337
337
        reader = self.get_reader_for(
338
 
            b"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
 
338
            "Bazaar pack format 1 (introduced in 0.18)\nEcrud")
339
339
        self.assertRaises(
340
340
            errors.ContainerHasExcessDataError, reader.validate)
341
341
 
342
342
    def test_validate_no_end_marker(self):
343
343
        """validate raises UnexpectedEndOfContainerError if there's no end of
344
 
        container marker, even if the container up to this point has been
345
 
        valid.
 
344
        container marker, even if the container up to this point has been valid.
346
345
        """
347
346
        reader = self.get_reader_for(
348
 
            b"Bazaar pack format 1 (introduced in 0.18)\n")
 
347
            "Bazaar pack format 1 (introduced in 0.18)\n")
349
348
        self.assertRaises(
350
349
            errors.UnexpectedEndOfContainerError, reader.validate)
351
350
 
354
353
        multiple times in the container.
355
354
        """
356
355
        reader = self.get_reader_for(
357
 
            b"Bazaar pack format 1 (introduced in 0.18)\n"
358
 
            b"B0\nname\n\n"
359
 
            b"B0\nname\n\n"
360
 
            b"E")
 
356
            "Bazaar pack format 1 (introduced in 0.18)\n"
 
357
            "B0\nname\n\n"
 
358
            "B0\nname\n\n"
 
359
            "E")
361
360
        self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
362
361
 
363
362
    def test_validate_undecodeable_name(self):
364
363
        """Names that aren't valid UTF-8 cause validate to fail."""
365
364
        reader = self.get_reader_for(
366
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
 
365
            "Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
367
366
        self.assertRaises(errors.InvalidRecordError, reader.validate)
368
367
 
369
368
 
376
375
    tests for reading that format should be added.
377
376
    """
378
377
 
379
 
    def get_reader_for(self, data):
380
 
        stream = BytesIO(data)
 
378
    def get_reader_for(self, bytes):
 
379
        stream = BytesIO(bytes)
381
380
        reader = pack.BytesRecordReader(stream)
382
381
        return reader
383
382
 
385
384
        """Reading a Bytes record with no name returns an empty list of
386
385
        names.
387
386
        """
388
 
        reader = self.get_reader_for(b"5\n\naaaaa")
 
387
        reader = self.get_reader_for("5\n\naaaaa")
389
388
        names, get_bytes = reader.read()
390
389
        self.assertEqual([], names)
391
 
        self.assertEqual(b'aaaaa', get_bytes(None))
 
390
        self.assertEqual('aaaaa', get_bytes(None))
392
391
 
393
392
    def test_record_with_one_name(self):
394
393
        """Reading a Bytes record with one name returns a list of just that
395
394
        name.
396
395
        """
397
 
        reader = self.get_reader_for(b"5\nname1\n\naaaaa")
 
396
        reader = self.get_reader_for("5\nname1\n\naaaaa")
398
397
        names, get_bytes = reader.read()
399
 
        self.assertEqual([(b'name1', )], names)
400
 
        self.assertEqual(b'aaaaa', get_bytes(None))
 
398
        self.assertEqual([('name1', )], names)
 
399
        self.assertEqual('aaaaa', get_bytes(None))
401
400
 
402
401
    def test_record_with_two_names(self):
403
402
        """Reading a Bytes record with two names returns a list of both names.
404
403
        """
405
 
        reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
 
404
        reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
406
405
        names, get_bytes = reader.read()
407
 
        self.assertEqual([(b'name1', ), (b'name2', )], names)
408
 
        self.assertEqual(b'aaaaa', get_bytes(None))
 
406
        self.assertEqual([('name1', ), ('name2', )], names)
 
407
        self.assertEqual('aaaaa', get_bytes(None))
409
408
 
410
409
    def test_record_with_two_part_names(self):
411
410
        """Reading a Bytes record with a two_part name reads both."""
412
 
        reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
 
411
        reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
413
412
        names, get_bytes = reader.read()
414
 
        self.assertEqual([(b'name1', b'name2', )], names)
415
 
        self.assertEqual(b'aaaaa', get_bytes(None))
 
413
        self.assertEqual([('name1', 'name2', )], names)
 
414
        self.assertEqual('aaaaa', get_bytes(None))
416
415
 
417
416
    def test_invalid_length(self):
418
417
        """If the length-prefix is not a number, parsing raises
419
418
        InvalidRecordError.
420
419
        """
421
 
        reader = self.get_reader_for(b"not a number\n")
 
420
        reader = self.get_reader_for("not a number\n")
422
421
        self.assertRaises(errors.InvalidRecordError, reader.read)
423
422
 
424
423
    def test_early_eof(self):
431
430
 
432
431
        In all cases, UnexpectedEndOfContainerError should be raised.
433
432
        """
434
 
        complete_record = b"6\nname\n\nabcdef"
 
433
        complete_record = "6\nname\n\nabcdef"
435
434
        for count in range(0, len(complete_record)):
436
435
            incomplete_record = complete_record[:count]
437
436
            reader = self.get_reader_for(incomplete_record)
449
448
 
450
449
    def test_initial_eof(self):
451
450
        """EOF before any bytes read at all."""
452
 
        reader = self.get_reader_for(b"")
 
451
        reader = self.get_reader_for("")
453
452
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
454
453
 
455
454
    def test_eof_after_length(self):
456
455
        """EOF after reading the length and before reading name(s)."""
457
 
        reader = self.get_reader_for(b"123\n")
 
456
        reader = self.get_reader_for("123\n")
458
457
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
459
458
 
460
459
    def test_eof_during_name(self):
461
460
        """EOF during reading a name."""
462
 
        reader = self.get_reader_for(b"123\nname")
 
461
        reader = self.get_reader_for("123\nname")
463
462
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
464
463
 
465
464
    def test_read_invalid_name_whitespace(self):
466
465
        """Names must have no whitespace."""
467
466
        # A name with a space.
468
 
        reader = self.get_reader_for(b"0\nbad name\n\n")
 
467
        reader = self.get_reader_for("0\nbad name\n\n")
469
468
        self.assertRaises(errors.InvalidRecordError, reader.read)
470
469
 
471
470
        # A name with a tab.
472
 
        reader = self.get_reader_for(b"0\nbad\tname\n\n")
 
471
        reader = self.get_reader_for("0\nbad\tname\n\n")
473
472
        self.assertRaises(errors.InvalidRecordError, reader.read)
474
473
 
475
474
        # A name with a vertical tab.
476
 
        reader = self.get_reader_for(b"0\nbad\vname\n\n")
 
475
        reader = self.get_reader_for("0\nbad\vname\n\n")
477
476
        self.assertRaises(errors.InvalidRecordError, reader.read)
478
477
 
479
478
    def test_validate_whitespace_in_name(self):
480
479
        """Names must have no whitespace."""
481
 
        reader = self.get_reader_for(b"0\nbad name\n\n")
 
480
        reader = self.get_reader_for("0\nbad name\n\n")
482
481
        self.assertRaises(errors.InvalidRecordError, reader.validate)
483
482
 
484
483
    def test_validate_interrupted_prelude(self):
485
484
        """EOF during reading a record's prelude causes validate to fail."""
486
 
        reader = self.get_reader_for(b"")
 
485
        reader = self.get_reader_for("")
487
486
        self.assertRaises(
488
487
            errors.UnexpectedEndOfContainerError, reader.validate)
489
488
 
490
489
    def test_validate_interrupted_body(self):
491
490
        """EOF during reading a record's body causes validate to fail."""
492
 
        reader = self.get_reader_for(b"1\n\n")
 
491
        reader = self.get_reader_for("1\n\n")
493
492
        self.assertRaises(
494
493
            errors.UnexpectedEndOfContainerError, reader.validate)
495
494
 
496
495
    def test_validate_unparseable_length(self):
497
496
        """An unparseable record length causes validate to fail."""
498
 
        reader = self.get_reader_for(b"\n\n")
 
497
        reader = self.get_reader_for("\n\n")
499
498
        self.assertRaises(
500
499
            errors.InvalidRecordError, reader.validate)
501
500
 
502
501
    def test_validate_undecodeable_name(self):
503
502
        """Names that aren't valid UTF-8 cause validate to fail."""
504
 
        reader = self.get_reader_for(b"0\n\xcc\n\n")
 
503
        reader = self.get_reader_for("0\n\xcc\n\n")
505
504
        self.assertRaises(errors.InvalidRecordError, reader.validate)
506
505
 
507
506
    def test_read_max_length(self):
508
507
        """If the max_length passed to the callable returned by read is not
509
508
        None, then no more than that many bytes will be read.
510
509
        """
511
 
        reader = self.get_reader_for(b"6\n\nabcdef")
 
510
        reader = self.get_reader_for("6\n\nabcdef")
512
511
        names, get_bytes = reader.read()
513
 
        self.assertEqual(b'abc', get_bytes(3))
 
512
        self.assertEqual('abc', get_bytes(3))
514
513
 
515
514
    def test_read_no_max_length(self):
516
515
        """If the max_length passed to the callable returned by read is None,
517
516
        then all the bytes in the record will be read.
518
517
        """
519
 
        reader = self.get_reader_for(b"6\n\nabcdef")
 
518
        reader = self.get_reader_for("6\n\nabcdef")
520
519
        names, get_bytes = reader.read()
521
 
        self.assertEqual(b'abcdef', get_bytes(None))
 
520
        self.assertEqual('abcdef', get_bytes(None))
522
521
 
523
522
    def test_repeated_read_calls(self):
524
523
        """Repeated calls to the callable returned from BytesRecordReader.read
525
524
        will not read beyond the end of the record.
526
525
        """
527
 
        reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
 
526
        reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
528
527
        names, get_bytes = reader.read()
529
 
        self.assertEqual(b'abcdef', get_bytes(None))
530
 
        self.assertEqual(b'', get_bytes(None))
531
 
        self.assertEqual(b'', get_bytes(99))
 
528
        self.assertEqual('abcdef', get_bytes(None))
 
529
        self.assertEqual('', get_bytes(None))
 
530
        self.assertEqual('', get_bytes(99))
532
531
 
533
532
 
534
533
class TestMakeReadvReader(tests.TestCaseWithTransport):
538
537
        writer = pack.ContainerWriter(pack_data.write)
539
538
        writer.begin()
540
539
        memos = []
541
 
        memos.append(writer.add_bytes_record([b'abc'], 3, names=[]))
542
 
        memos.append(writer.add_bytes_record([b'def'], 3, names=[(b'name1', )]))
543
 
        memos.append(writer.add_bytes_record([b'ghi'], 3, names=[(b'name2', )]))
544
 
        memos.append(writer.add_bytes_record([b'jkl'], 3, names=[]))
 
540
        memos.append(writer.add_bytes_record('abc', names=[]))
 
541
        memos.append(writer.add_bytes_record('def', names=[('name1', )]))
 
542
        memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
 
543
        memos.append(writer.add_bytes_record('jkl', names=[]))
545
544
        writer.end()
546
545
        transport = self.get_transport()
547
546
        transport.put_bytes('mypack', pack_data.getvalue())
550
549
        result = []
551
550
        for names, reader_func in reader.iter_records():
552
551
            result.append((names, reader_func(None)))
553
 
        self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
 
552
        self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
554
553
 
555
554
 
556
555
class TestReadvFile(tests.TestCaseWithTransport):
564
563
    def test_read_bytes(self):
565
564
        """Test reading of both single bytes and all bytes in a hunk."""
566
565
        transport = self.get_transport()
567
 
        transport.put_bytes('sample', b'0123456789')
568
 
        f = pack.ReadVFile(transport.readv(
569
 
            'sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
 
566
        transport.put_bytes('sample', '0123456789')
 
567
        f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
570
568
        results = []
571
569
        results.append(f.read(1))
572
570
        results.append(f.read(2))
573
571
        results.append(f.read(1))
574
572
        results.append(f.read(1))
575
573
        results.append(f.read(1))
576
 
        self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
 
574
        self.assertEqual(['0', '12', '4', '6', '7'], results)
577
575
 
578
576
    def test_readline(self):
579
577
        """Test using readline() as ContainerReader does.
581
579
        This is always within a readv hunk, never across it.
582
580
        """
583
581
        transport = self.get_transport()
584
 
        transport.put_bytes('sample', b'0\n2\n4\n')
585
 
        f = pack.ReadVFile(transport.readv('sample', [(0, 2), (2, 4)]))
 
582
        transport.put_bytes('sample', '0\n2\n4\n')
 
583
        f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
586
584
        results = []
587
585
        results.append(f.readline())
588
586
        results.append(f.readline())
589
587
        results.append(f.readline())
590
 
        self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
 
588
        self.assertEqual(['0\n', '2\n', '4\n'], results)
591
589
 
592
590
    def test_readline_and_read(self):
593
591
        """Test exercising one byte reads, readline, and then read again."""
594
592
        transport = self.get_transport()
595
 
        transport.put_bytes('sample', b'0\n2\n4\n')
596
 
        f = pack.ReadVFile(transport.readv('sample', [(0, 6)]))
 
593
        transport.put_bytes('sample', '0\n2\n4\n')
 
594
        f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
597
595
        results = []
598
596
        results.append(f.read(1))
599
597
        results.append(f.readline())
600
598
        results.append(f.read(4))
601
 
        self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
 
599
        self.assertEqual(['0', '\n', '2\n4\n'], results)
602
600
 
603
601
 
604
602
class PushParserTestCase(tests.TestCase):
606
604
 
607
605
    def make_parser_expecting_record_type(self):
608
606
        parser = pack.ContainerPushParser()
609
 
        parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\n")
 
607
        parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
610
608
        return parser
611
609
 
612
610
    def make_parser_expecting_bytes_record(self):
613
611
        parser = pack.ContainerPushParser()
614
 
        parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\nB")
 
612
        parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
615
613
        return parser
616
614
 
617
 
    def assertRecordParsing(self, expected_record, data):
 
615
    def assertRecordParsing(self, expected_record, bytes):
618
616
        """Assert that 'bytes' is parsed as a given bytes record.
619
617
 
620
618
        :param expected_record: A tuple of (names, bytes).
621
619
        """
622
620
        parser = self.make_parser_expecting_bytes_record()
623
 
        parser.accept_bytes(data)
 
621
        parser.accept_bytes(bytes)
624
622
        parsed_records = parser.read_pending_records()
625
623
        self.assertEqual([expected_record], parsed_records)
626
624
 
644
642
        (A naive implementation might stop after parsing the first record.)
645
643
        """
646
644
        parser = self.make_parser_expecting_record_type()
647
 
        parser.accept_bytes(b"B5\nname1\n\nbody1B5\nname2\n\nbody2")
 
645
        parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
648
646
        self.assertEqual(
649
 
            [([(b'name1',)], b'body1'), ([(b'name2',)], b'body2')],
 
647
            [([('name1',)], 'body1'), ([('name2',)], 'body2')],
650
648
            parser.read_pending_records())
651
649
 
652
650
    def test_multiple_empty_records_at_once(self):
657
655
        record, because the buffer size had not changed.)
658
656
        """
659
657
        parser = self.make_parser_expecting_record_type()
660
 
        parser.accept_bytes(b"B0\nname1\n\nB0\nname2\n\n")
 
658
        parser.accept_bytes("B0\nname1\n\nB0\nname2\n\n")
661
659
        self.assertEqual(
662
 
            [([(b'name1',)], b''), ([(b'name2',)], b'')],
 
660
            [([('name1',)], ''), ([('name2',)], '')],
663
661
            parser.read_pending_records())
664
662
 
665
663
 
675
673
        """Reading a Bytes record with no name returns an empty list of
676
674
        names.
677
675
        """
678
 
        self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
 
676
        self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
679
677
 
680
678
    def test_record_with_one_name(self):
681
679
        """Reading a Bytes record with one name returns a list of just that
682
680
        name.
683
681
        """
684
682
        self.assertRecordParsing(
685
 
            ([(b'name1', )], b'aaaaa'),
686
 
            b"5\nname1\n\naaaaa")
 
683
            ([('name1', )], 'aaaaa'),
 
684
            "5\nname1\n\naaaaa")
687
685
 
688
686
    def test_record_with_two_names(self):
689
687
        """Reading a Bytes record with two names returns a list of both names.
690
688
        """
691
689
        self.assertRecordParsing(
692
 
            ([(b'name1', ), (b'name2', )], b'aaaaa'),
693
 
            b"5\nname1\nname2\n\naaaaa")
 
690
            ([('name1', ), ('name2', )], 'aaaaa'),
 
691
            "5\nname1\nname2\n\naaaaa")
694
692
 
695
693
    def test_record_with_two_part_names(self):
696
694
        """Reading a Bytes record with a two_part name reads both."""
697
695
        self.assertRecordParsing(
698
 
            ([(b'name1', b'name2')], b'aaaaa'),
699
 
            b"5\nname1\x00name2\n\naaaaa")
 
696
            ([('name1', 'name2')], 'aaaaa'),
 
697
            "5\nname1\x00name2\n\naaaaa")
700
698
 
701
699
    def test_invalid_length(self):
702
700
        """If the length-prefix is not a number, parsing raises
704
702
        """
705
703
        parser = self.make_parser_expecting_bytes_record()
706
704
        self.assertRaises(
707
 
            errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
 
705
            errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
708
706
 
709
707
    def test_incomplete_record(self):
710
708
        """If the bytes seen so far don't form a complete record, then there
711
709
        will be nothing returned by read_pending_records.
712
710
        """
713
711
        parser = self.make_parser_expecting_bytes_record()
714
 
        parser.accept_bytes(b"5\n\nabcd")
 
712
        parser.accept_bytes("5\n\nabcd")
715
713
        self.assertEqual([], parser.read_pending_records())
716
714
 
717
715
    def test_accept_nothing(self):
718
716
        """The edge case of parsing an empty string causes no error."""
719
717
        parser = self.make_parser_expecting_bytes_record()
720
 
        parser.accept_bytes(b"")
 
718
        parser.accept_bytes("")
721
719
 
722
 
    def assertInvalidRecord(self, data):
723
 
        """Assert that parsing the given bytes raises InvalidRecordError."""
 
720
    def assertInvalidRecord(self, bytes):
 
721
        """Assert that parsing the given bytes will raise an
 
722
        InvalidRecordError.
 
723
        """
724
724
        parser = self.make_parser_expecting_bytes_record()
725
725
        self.assertRaises(
726
 
            errors.InvalidRecordError, parser.accept_bytes, data)
 
726
            errors.InvalidRecordError, parser.accept_bytes, bytes)
727
727
 
728
728
    def test_read_invalid_name_whitespace(self):
729
729
        """Names must have no whitespace."""
730
730
        # A name with a space.
731
 
        self.assertInvalidRecord(b"0\nbad name\n\n")
 
731
        self.assertInvalidRecord("0\nbad name\n\n")
732
732
 
733
733
        # A name with a tab.
734
 
        self.assertInvalidRecord(b"0\nbad\tname\n\n")
 
734
        self.assertInvalidRecord("0\nbad\tname\n\n")
735
735
 
736
736
        # A name with a vertical tab.
737
 
        self.assertInvalidRecord(b"0\nbad\vname\n\n")
 
737
        self.assertInvalidRecord("0\nbad\vname\n\n")
738
738
 
739
739
    def test_repeated_read_pending_records(self):
740
740
        """read_pending_records will not return the same record twice."""
741
741
        parser = self.make_parser_expecting_bytes_record()
742
 
        parser.accept_bytes(b"6\n\nabcdef")
743
 
        self.assertEqual([([], b'abcdef')], parser.read_pending_records())
 
742
        parser.accept_bytes("6\n\nabcdef")
 
743
        self.assertEqual([([], 'abcdef')], parser.read_pending_records())
744
744
        self.assertEqual([], parser.read_pending_records())