/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_pack.py

  • Committer: Marius Kruger
  • Date: 2010-07-10 21:28:56 UTC
  • mto: (5384.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5385.
  • Revision ID: marius.kruger@enerweb.co.za-20100710212856-uq4ji3go0u5se7hx
* Update documentation
* add NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007, 2009, 2011, 2012, 2016 Canonical Ltd
 
1
# Copyright (C) 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Tests for breezy.pack."""
18
 
 
19
 
from .. import errors, tests
20
 
from ..bzr import (
21
 
    pack,
22
 
    )
23
 
from ..sixish import (
24
 
    BytesIO,
25
 
    )
 
17
"""Tests for bzrlib.pack."""
 
18
 
 
19
 
 
20
from cStringIO import StringIO
 
21
 
 
22
from bzrlib import pack, errors, tests
26
23
 
27
24
 
28
25
class TestContainerSerialiser(tests.TestCase):
34
31
 
35
32
    def test_begin(self):
36
33
        serialiser = pack.ContainerSerialiser()
37
 
        self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
 
34
        self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
35
                         serialiser.begin())
39
36
 
40
37
    def test_end(self):
41
38
        serialiser = pack.ContainerSerialiser()
42
 
        self.assertEqual(b'E', serialiser.end())
 
39
        self.assertEqual('E', serialiser.end())
43
40
 
44
41
    def test_bytes_record_no_name(self):
45
42
        serialiser = pack.ContainerSerialiser()
46
 
        record = serialiser.bytes_record(b'bytes', [])
47
 
        self.assertEqual(b'B5\n\nbytes', record)
 
43
        record = serialiser.bytes_record('bytes', [])
 
44
        self.assertEqual('B5\n\nbytes', record)
48
45
 
49
46
    def test_bytes_record_one_name_with_one_part(self):
50
47
        serialiser = pack.ContainerSerialiser()
51
 
        record = serialiser.bytes_record(b'bytes', [(b'name',)])
52
 
        self.assertEqual(b'B5\nname\n\nbytes', record)
 
48
        record = serialiser.bytes_record('bytes', [('name',)])
 
49
        self.assertEqual('B5\nname\n\nbytes', record)
53
50
 
54
51
    def test_bytes_record_one_name_with_two_parts(self):
55
52
        serialiser = pack.ContainerSerialiser()
56
 
        record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
57
 
        self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
 
53
        record = serialiser.bytes_record('bytes', [('part1', 'part2')])
 
54
        self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
58
55
 
59
56
    def test_bytes_record_two_names(self):
60
57
        serialiser = pack.ContainerSerialiser()
61
 
        record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
62
 
        self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
 
58
        record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
 
59
        self.assertEqual('B5\nname1\nname2\n\nbytes', record)
63
60
 
64
61
    def test_bytes_record_whitespace_in_name_part(self):
65
62
        serialiser = pack.ContainerSerialiser()
66
63
        self.assertRaises(
67
64
            errors.InvalidRecordError,
68
 
            serialiser.bytes_record, b'bytes', [(b'bad name',)])
69
 
 
70
 
    def test_bytes_record_header(self):
71
 
        serialiser = pack.ContainerSerialiser()
72
 
        record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
73
 
        self.assertEqual(b'B32\nname1\nname2\n\n', record)
 
65
            serialiser.bytes_record, 'bytes', [('bad name',)])
74
66
 
75
67
 
76
68
class TestContainerWriter(tests.TestCase):
77
69
 
78
70
    def setUp(self):
79
 
        super(TestContainerWriter, self).setUp()
80
 
        self.output = BytesIO()
 
71
        tests.TestCase.setUp(self)
 
72
        self.output = StringIO()
81
73
        self.writer = pack.ContainerWriter(self.output.write)
82
74
 
83
75
    def assertOutput(self, expected_output):
92
84
        This uses None as the output stream to show that the constructor
93
85
        doesn't try to use the output stream.
94
86
        """
95
 
        pack.ContainerWriter(None)
 
87
        writer = pack.ContainerWriter(None)
96
88
 
97
89
    def test_begin(self):
98
90
        """The begin() method writes the container format marker line."""
99
91
        self.writer.begin()
100
 
        self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\n')
 
92
        self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
101
93
 
102
94
    def test_zero_records_written_after_begin(self):
103
95
        """After begin is written, 0 records have been written."""
108
100
        """The end() method writes an End Marker record."""
109
101
        self.writer.begin()
110
102
        self.writer.end()
111
 
        self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\nE')
 
103
        self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
112
104
 
113
105
    def test_empty_end_does_not_add_a_record_to_records_written(self):
114
106
        """The end() method does not count towards the records written."""
119
111
    def test_non_empty_end_does_not_add_a_record_to_records_written(self):
120
112
        """The end() method does not count towards the records written."""
121
113
        self.writer.begin()
122
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
 
114
        self.writer.add_bytes_record('foo', names=[])
123
115
        self.writer.end()
124
116
        self.assertEqual(1, self.writer.records_written)
125
117
 
126
118
    def test_add_bytes_record_no_name(self):
127
119
        """Add a bytes record with no name."""
128
120
        self.writer.begin()
129
 
        offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
 
121
        offset, length = self.writer.add_bytes_record('abc', names=[])
130
122
        self.assertEqual((42, 7), (offset, length))
131
123
        self.assertOutput(
132
 
            b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
 
124
            'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
133
125
 
134
126
    def test_add_bytes_record_one_name(self):
135
127
        """Add a bytes record with one name."""
136
128
        self.writer.begin()
137
 
 
138
129
        offset, length = self.writer.add_bytes_record(
139
 
            [b'abc'], len(b'abc'), names=[(b'name1', )])
 
130
            'abc', names=[('name1', )])
140
131
        self.assertEqual((42, 13), (offset, length))
141
132
        self.assertOutput(
142
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
143
 
            b'B3\nname1\n\nabc')
144
 
 
145
 
    def test_add_bytes_record_split_writes(self):
146
 
        """Write a large record which does multiple IOs"""
147
 
 
148
 
        writes = []
149
 
        real_write = self.writer.write_func
150
 
 
151
 
        def record_writes(data):
152
 
            writes.append(data)
153
 
            return real_write(data)
154
 
 
155
 
        self.writer.write_func = record_writes
156
 
        self.writer._JOIN_WRITES_THRESHOLD = 2
157
 
 
158
 
        self.writer.begin()
159
 
        offset, length = self.writer.add_bytes_record(
160
 
            [b'abcabc'], len(b'abcabc'), names=[(b'name1', )])
161
 
        self.assertEqual((42, 16), (offset, length))
162
 
        self.assertOutput(
163
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
164
 
            b'B6\nname1\n\nabcabc')
165
 
 
166
 
        self.assertEqual([
167
 
            b'Bazaar pack format 1 (introduced in 0.18)\n',
168
 
            b'B6\nname1\n\n',
169
 
            b'abcabc'],
170
 
            writes)
171
 
 
172
 
    def test_add_bytes_record_two_names(self):
173
 
        """Add a bytes record with two names."""
174
 
        self.writer.begin()
175
 
        offset, length = self.writer.add_bytes_record(
176
 
            [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
177
 
        self.assertEqual((42, 19), (offset, length))
178
 
        self.assertOutput(
179
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
180
 
            b'B3\nname1\nname2\n\nabc')
181
 
 
182
 
    def test_add_bytes_record_two_names(self):
183
 
        """Add a bytes record with two names."""
184
 
        self.writer.begin()
185
 
        offset, length = self.writer.add_bytes_record(
186
 
            [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
187
 
        self.assertEqual((42, 19), (offset, length))
188
 
        self.assertOutput(
189
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
190
 
            b'B3\nname1\nname2\n\nabc')
 
133
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
134
            'B3\nname1\n\nabc')
 
135
 
 
136
    def test_add_bytes_record_two_names(self):
 
137
        """Add a bytes record with two names."""
 
138
        self.writer.begin()
 
139
        offset, length = self.writer.add_bytes_record(
 
140
            'abc', names=[('name1', ), ('name2', )])
 
141
        self.assertEqual((42, 19), (offset, length))
 
142
        self.assertOutput(
 
143
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
144
            'B3\nname1\nname2\n\nabc')
 
145
 
 
146
    def test_add_bytes_record_two_names(self):
 
147
        """Add a bytes record with two names."""
 
148
        self.writer.begin()
 
149
        offset, length = self.writer.add_bytes_record(
 
150
            'abc', names=[('name1', ), ('name2', )])
 
151
        self.assertEqual((42, 19), (offset, length))
 
152
        self.assertOutput(
 
153
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
154
            'B3\nname1\nname2\n\nabc')
191
155
 
192
156
    def test_add_bytes_record_two_element_name(self):
193
157
        """Add a bytes record with a two-element name."""
194
158
        self.writer.begin()
195
159
        offset, length = self.writer.add_bytes_record(
196
 
            [b'abc'], len(b'abc'), names=[(b'name1', b'name2')])
 
160
            'abc', names=[('name1', 'name2')])
197
161
        self.assertEqual((42, 19), (offset, length))
198
162
        self.assertOutput(
199
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
200
 
            b'B3\nname1\x00name2\n\nabc')
 
163
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
164
            'B3\nname1\x00name2\n\nabc')
201
165
 
202
166
    def test_add_second_bytes_record_gets_higher_offset(self):
203
167
        self.writer.begin()
204
 
        self.writer.add_bytes_record([b'a', b'bc'], len(b'abc'), names=[])
205
 
        offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
 
168
        self.writer.add_bytes_record('abc', names=[])
 
169
        offset, length = self.writer.add_bytes_record('abc', names=[])
206
170
        self.assertEqual((49, 7), (offset, length))
207
171
        self.assertOutput(
208
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
209
 
            b'B3\n\nabc'
210
 
            b'B3\n\nabc')
 
172
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
173
            'B3\n\nabc'
 
174
            'B3\n\nabc')
211
175
 
212
176
    def test_add_bytes_record_invalid_name(self):
213
177
        """Adding a Bytes record with a name with whitespace in it raises
216
180
        self.writer.begin()
217
181
        self.assertRaises(
218
182
            errors.InvalidRecordError,
219
 
            self.writer.add_bytes_record, [b'abc'], len(b'abc'), names=[(b'bad name', )])
 
183
            self.writer.add_bytes_record, 'abc', names=[('bad name', )])
220
184
 
221
185
    def test_add_bytes_records_add_to_records_written(self):
222
186
        """Adding a Bytes record increments the records_written counter."""
223
187
        self.writer.begin()
224
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
 
188
        self.writer.add_bytes_record('foo', names=[])
225
189
        self.assertEqual(1, self.writer.records_written)
226
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
 
190
        self.writer.add_bytes_record('foo', names=[])
227
191
        self.assertEqual(2, self.writer.records_written)
228
192
 
229
193
 
235
199
    added, then separate tests for that format should be added.
236
200
    """
237
201
 
238
 
    def get_reader_for(self, data):
239
 
        stream = BytesIO(data)
 
202
    def get_reader_for(self, bytes):
 
203
        stream = StringIO(bytes)
240
204
        reader = pack.ContainerReader(stream)
241
205
        return reader
242
206
 
243
207
    def test_construct(self):
244
208
        """Test constructing a ContainerReader.
245
209
 
246
 
        This uses None as the output stream to show that the constructor
247
 
        doesn't try to use the input stream.
 
210
        This uses None as the output stream to show that the constructor doesn't
 
211
        try to use the input stream.
248
212
        """
249
 
        pack.ContainerReader(None)
 
213
        reader = pack.ContainerReader(None)
250
214
 
251
215
    def test_empty_container(self):
252
216
        """Read an empty container."""
253
217
        reader = self.get_reader_for(
254
 
            b"Bazaar pack format 1 (introduced in 0.18)\nE")
 
218
            "Bazaar pack format 1 (introduced in 0.18)\nE")
255
219
        self.assertEqual([], list(reader.iter_records()))
256
220
 
257
221
    def test_unknown_format(self):
258
222
        """Unrecognised container formats raise UnknownContainerFormatError."""
259
 
        reader = self.get_reader_for(b"unknown format\n")
 
223
        reader = self.get_reader_for("unknown format\n")
260
224
        self.assertRaises(
261
225
            errors.UnknownContainerFormatError, reader.iter_records)
262
226
 
265
229
        UnexpectedEndOfContainerError to be raised.
266
230
        """
267
231
        reader = self.get_reader_for(
268
 
            b"Bazaar pack format 1 (introduced in 0.18)\n")
 
232
            "Bazaar pack format 1 (introduced in 0.18)\n")
269
233
        iterator = reader.iter_records()
270
234
        self.assertRaises(
271
 
            errors.UnexpectedEndOfContainerError, next, iterator)
 
235
            errors.UnexpectedEndOfContainerError, iterator.next)
272
236
 
273
237
    def test_unknown_record_type(self):
274
238
        """Unknown record types cause UnknownRecordTypeError to be raised."""
275
239
        reader = self.get_reader_for(
276
 
            b"Bazaar pack format 1 (introduced in 0.18)\nX")
 
240
            "Bazaar pack format 1 (introduced in 0.18)\nX")
277
241
        iterator = reader.iter_records()
278
242
        self.assertRaises(
279
 
            errors.UnknownRecordTypeError, next, iterator)
 
243
            errors.UnknownRecordTypeError, iterator.next)
280
244
 
281
245
    def test_container_with_one_unnamed_record(self):
282
246
        """Read a container with one Bytes record.
286
250
        ContainerReader's integration with BytesRecordReader is working.
287
251
        """
288
252
        reader = self.get_reader_for(
289
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
290
 
        expected_records = [([], b'aaaaa')]
 
253
            "Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
 
254
        expected_records = [([], 'aaaaa')]
291
255
        self.assertEqual(
292
256
            expected_records,
293
257
            [(names, read_bytes(None))
295
259
 
296
260
    def test_validate_empty_container(self):
297
261
        """validate does not raise an error for a container with no records."""
298
 
        reader = self.get_reader_for(
299
 
            b"Bazaar pack format 1 (introduced in 0.18)\nE")
 
262
        reader = self.get_reader_for("Bazaar pack format 1 (introduced in 0.18)\nE")
300
263
        # No exception raised
301
264
        reader.validate()
302
265
 
304
267
        """validate does not raise an error for a container with a valid record.
305
268
        """
306
269
        reader = self.get_reader_for(
307
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
 
270
            "Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
308
271
        # No exception raised
309
272
        reader.validate()
310
273
 
314
277
        It may raise either UnexpectedEndOfContainerError or
315
278
        UnknownContainerFormatError, depending on exactly what the string is.
316
279
        """
317
 
        inputs = [
318
 
            b"", b"x", b"Bazaar pack format 1 (introduced in 0.18)", b"bad\n"]
 
280
        inputs = ["", "x", "Bazaar pack format 1 (introduced in 0.18)", "bad\n"]
319
281
        for input in inputs:
320
282
            reader = self.get_reader_for(input)
321
283
            self.assertRaises(
328
290
        types.
329
291
        """
330
292
        reader = self.get_reader_for(
331
 
            b"Bazaar pack format 1 (introduced in 0.18)\nX")
 
293
            "Bazaar pack format 1 (introduced in 0.18)\nX")
332
294
        self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
333
295
 
334
296
    def test_validate_data_after_end_marker(self):
336
298
        after the end of the container.
337
299
        """
338
300
        reader = self.get_reader_for(
339
 
            b"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
 
301
            "Bazaar pack format 1 (introduced in 0.18)\nEcrud")
340
302
        self.assertRaises(
341
303
            errors.ContainerHasExcessDataError, reader.validate)
342
304
 
343
305
    def test_validate_no_end_marker(self):
344
306
        """validate raises UnexpectedEndOfContainerError if there's no end of
345
 
        container marker, even if the container up to this point has been
346
 
        valid.
 
307
        container marker, even if the container up to this point has been valid.
347
308
        """
348
309
        reader = self.get_reader_for(
349
 
            b"Bazaar pack format 1 (introduced in 0.18)\n")
 
310
            "Bazaar pack format 1 (introduced in 0.18)\n")
350
311
        self.assertRaises(
351
312
            errors.UnexpectedEndOfContainerError, reader.validate)
352
313
 
355
316
        multiple times in the container.
356
317
        """
357
318
        reader = self.get_reader_for(
358
 
            b"Bazaar pack format 1 (introduced in 0.18)\n"
359
 
            b"B0\nname\n\n"
360
 
            b"B0\nname\n\n"
361
 
            b"E")
 
319
            "Bazaar pack format 1 (introduced in 0.18)\n"
 
320
            "B0\nname\n\n"
 
321
            "B0\nname\n\n"
 
322
            "E")
362
323
        self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
363
324
 
364
325
    def test_validate_undecodeable_name(self):
365
326
        """Names that aren't valid UTF-8 cause validate to fail."""
366
327
        reader = self.get_reader_for(
367
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
 
328
            "Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
368
329
        self.assertRaises(errors.InvalidRecordError, reader.validate)
369
330
 
370
331
 
377
338
    tests for reading that format should be added.
378
339
    """
379
340
 
380
 
    def get_reader_for(self, data):
381
 
        stream = BytesIO(data)
 
341
    def get_reader_for(self, bytes):
 
342
        stream = StringIO(bytes)
382
343
        reader = pack.BytesRecordReader(stream)
383
344
        return reader
384
345
 
386
347
        """Reading a Bytes record with no name returns an empty list of
387
348
        names.
388
349
        """
389
 
        reader = self.get_reader_for(b"5\n\naaaaa")
 
350
        reader = self.get_reader_for("5\n\naaaaa")
390
351
        names, get_bytes = reader.read()
391
352
        self.assertEqual([], names)
392
 
        self.assertEqual(b'aaaaa', get_bytes(None))
 
353
        self.assertEqual('aaaaa', get_bytes(None))
393
354
 
394
355
    def test_record_with_one_name(self):
395
356
        """Reading a Bytes record with one name returns a list of just that
396
357
        name.
397
358
        """
398
 
        reader = self.get_reader_for(b"5\nname1\n\naaaaa")
 
359
        reader = self.get_reader_for("5\nname1\n\naaaaa")
399
360
        names, get_bytes = reader.read()
400
 
        self.assertEqual([(b'name1', )], names)
401
 
        self.assertEqual(b'aaaaa', get_bytes(None))
 
361
        self.assertEqual([('name1', )], names)
 
362
        self.assertEqual('aaaaa', get_bytes(None))
402
363
 
403
364
    def test_record_with_two_names(self):
404
365
        """Reading a Bytes record with two names returns a list of both names.
405
366
        """
406
 
        reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
 
367
        reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
407
368
        names, get_bytes = reader.read()
408
 
        self.assertEqual([(b'name1', ), (b'name2', )], names)
409
 
        self.assertEqual(b'aaaaa', get_bytes(None))
 
369
        self.assertEqual([('name1', ), ('name2', )], names)
 
370
        self.assertEqual('aaaaa', get_bytes(None))
410
371
 
411
372
    def test_record_with_two_part_names(self):
412
373
        """Reading a Bytes record with a two_part name reads both."""
413
 
        reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
 
374
        reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
414
375
        names, get_bytes = reader.read()
415
 
        self.assertEqual([(b'name1', b'name2', )], names)
416
 
        self.assertEqual(b'aaaaa', get_bytes(None))
 
376
        self.assertEqual([('name1', 'name2', )], names)
 
377
        self.assertEqual('aaaaa', get_bytes(None))
417
378
 
418
379
    def test_invalid_length(self):
419
380
        """If the length-prefix is not a number, parsing raises
420
381
        InvalidRecordError.
421
382
        """
422
 
        reader = self.get_reader_for(b"not a number\n")
 
383
        reader = self.get_reader_for("not a number\n")
423
384
        self.assertRaises(errors.InvalidRecordError, reader.read)
424
385
 
425
386
    def test_early_eof(self):
432
393
 
433
394
        In all cases, UnexpectedEndOfContainerError should be raised.
434
395
        """
435
 
        complete_record = b"6\nname\n\nabcdef"
 
396
        complete_record = "6\nname\n\nabcdef"
436
397
        for count in range(0, len(complete_record)):
437
398
            incomplete_record = complete_record[:count]
438
399
            reader = self.get_reader_for(incomplete_record)
450
411
 
451
412
    def test_initial_eof(self):
452
413
        """EOF before any bytes read at all."""
453
 
        reader = self.get_reader_for(b"")
 
414
        reader = self.get_reader_for("")
454
415
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
455
416
 
456
417
    def test_eof_after_length(self):
457
418
        """EOF after reading the length and before reading name(s)."""
458
 
        reader = self.get_reader_for(b"123\n")
 
419
        reader = self.get_reader_for("123\n")
459
420
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
460
421
 
461
422
    def test_eof_during_name(self):
462
423
        """EOF during reading a name."""
463
 
        reader = self.get_reader_for(b"123\nname")
 
424
        reader = self.get_reader_for("123\nname")
464
425
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
465
426
 
466
427
    def test_read_invalid_name_whitespace(self):
467
428
        """Names must have no whitespace."""
468
429
        # A name with a space.
469
 
        reader = self.get_reader_for(b"0\nbad name\n\n")
 
430
        reader = self.get_reader_for("0\nbad name\n\n")
470
431
        self.assertRaises(errors.InvalidRecordError, reader.read)
471
432
 
472
433
        # A name with a tab.
473
 
        reader = self.get_reader_for(b"0\nbad\tname\n\n")
 
434
        reader = self.get_reader_for("0\nbad\tname\n\n")
474
435
        self.assertRaises(errors.InvalidRecordError, reader.read)
475
436
 
476
437
        # A name with a vertical tab.
477
 
        reader = self.get_reader_for(b"0\nbad\vname\n\n")
 
438
        reader = self.get_reader_for("0\nbad\vname\n\n")
478
439
        self.assertRaises(errors.InvalidRecordError, reader.read)
479
440
 
480
441
    def test_validate_whitespace_in_name(self):
481
442
        """Names must have no whitespace."""
482
 
        reader = self.get_reader_for(b"0\nbad name\n\n")
 
443
        reader = self.get_reader_for("0\nbad name\n\n")
483
444
        self.assertRaises(errors.InvalidRecordError, reader.validate)
484
445
 
485
446
    def test_validate_interrupted_prelude(self):
486
447
        """EOF during reading a record's prelude causes validate to fail."""
487
 
        reader = self.get_reader_for(b"")
 
448
        reader = self.get_reader_for("")
488
449
        self.assertRaises(
489
450
            errors.UnexpectedEndOfContainerError, reader.validate)
490
451
 
491
452
    def test_validate_interrupted_body(self):
492
453
        """EOF during reading a record's body causes validate to fail."""
493
 
        reader = self.get_reader_for(b"1\n\n")
 
454
        reader = self.get_reader_for("1\n\n")
494
455
        self.assertRaises(
495
456
            errors.UnexpectedEndOfContainerError, reader.validate)
496
457
 
497
458
    def test_validate_unparseable_length(self):
498
459
        """An unparseable record length causes validate to fail."""
499
 
        reader = self.get_reader_for(b"\n\n")
 
460
        reader = self.get_reader_for("\n\n")
500
461
        self.assertRaises(
501
462
            errors.InvalidRecordError, reader.validate)
502
463
 
503
464
    def test_validate_undecodeable_name(self):
504
465
        """Names that aren't valid UTF-8 cause validate to fail."""
505
 
        reader = self.get_reader_for(b"0\n\xcc\n\n")
 
466
        reader = self.get_reader_for("0\n\xcc\n\n")
506
467
        self.assertRaises(errors.InvalidRecordError, reader.validate)
507
468
 
508
469
    def test_read_max_length(self):
509
470
        """If the max_length passed to the callable returned by read is not
510
471
        None, then no more than that many bytes will be read.
511
472
        """
512
 
        reader = self.get_reader_for(b"6\n\nabcdef")
 
473
        reader = self.get_reader_for("6\n\nabcdef")
513
474
        names, get_bytes = reader.read()
514
 
        self.assertEqual(b'abc', get_bytes(3))
 
475
        self.assertEqual('abc', get_bytes(3))
515
476
 
516
477
    def test_read_no_max_length(self):
517
478
        """If the max_length passed to the callable returned by read is None,
518
479
        then all the bytes in the record will be read.
519
480
        """
520
 
        reader = self.get_reader_for(b"6\n\nabcdef")
 
481
        reader = self.get_reader_for("6\n\nabcdef")
521
482
        names, get_bytes = reader.read()
522
 
        self.assertEqual(b'abcdef', get_bytes(None))
 
483
        self.assertEqual('abcdef', get_bytes(None))
523
484
 
524
485
    def test_repeated_read_calls(self):
525
486
        """Repeated calls to the callable returned from BytesRecordReader.read
526
487
        will not read beyond the end of the record.
527
488
        """
528
 
        reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
 
489
        reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
529
490
        names, get_bytes = reader.read()
530
 
        self.assertEqual(b'abcdef', get_bytes(None))
531
 
        self.assertEqual(b'', get_bytes(None))
532
 
        self.assertEqual(b'', get_bytes(99))
 
491
        self.assertEqual('abcdef', get_bytes(None))
 
492
        self.assertEqual('', get_bytes(None))
 
493
        self.assertEqual('', get_bytes(99))
533
494
 
534
495
 
535
496
class TestMakeReadvReader(tests.TestCaseWithTransport):
536
497
 
537
498
    def test_read_skipping_records(self):
538
 
        pack_data = BytesIO()
 
499
        pack_data = StringIO()
539
500
        writer = pack.ContainerWriter(pack_data.write)
540
501
        writer.begin()
541
502
        memos = []
542
 
        memos.append(writer.add_bytes_record([b'abc'], 3, names=[]))
543
 
        memos.append(writer.add_bytes_record([b'def'], 3, names=[(b'name1', )]))
544
 
        memos.append(writer.add_bytes_record([b'ghi'], 3, names=[(b'name2', )]))
545
 
        memos.append(writer.add_bytes_record([b'jkl'], 3, names=[]))
 
503
        memos.append(writer.add_bytes_record('abc', names=[]))
 
504
        memos.append(writer.add_bytes_record('def', names=[('name1', )]))
 
505
        memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
 
506
        memos.append(writer.add_bytes_record('jkl', names=[]))
546
507
        writer.end()
547
508
        transport = self.get_transport()
548
509
        transport.put_bytes('mypack', pack_data.getvalue())
551
512
        result = []
552
513
        for names, reader_func in reader.iter_records():
553
514
            result.append((names, reader_func(None)))
554
 
        self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
 
515
        self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
555
516
 
556
517
 
557
518
class TestReadvFile(tests.TestCaseWithTransport):
565
526
    def test_read_bytes(self):
566
527
        """Test reading of both single bytes and all bytes in a hunk."""
567
528
        transport = self.get_transport()
568
 
        transport.put_bytes('sample', b'0123456789')
569
 
        f = pack.ReadVFile(transport.readv(
570
 
            'sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
 
529
        transport.put_bytes('sample', '0123456789')
 
530
        f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
571
531
        results = []
572
532
        results.append(f.read(1))
573
533
        results.append(f.read(2))
574
534
        results.append(f.read(1))
575
535
        results.append(f.read(1))
576
536
        results.append(f.read(1))
577
 
        self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
 
537
        self.assertEqual(['0', '12', '4', '6', '7'], results)
578
538
 
579
539
    def test_readline(self):
580
540
        """Test using readline() as ContainerReader does.
582
542
        This is always within a readv hunk, never across it.
583
543
        """
584
544
        transport = self.get_transport()
585
 
        transport.put_bytes('sample', b'0\n2\n4\n')
586
 
        f = pack.ReadVFile(transport.readv('sample', [(0, 2), (2, 4)]))
 
545
        transport.put_bytes('sample', '0\n2\n4\n')
 
546
        f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
587
547
        results = []
588
548
        results.append(f.readline())
589
549
        results.append(f.readline())
590
550
        results.append(f.readline())
591
 
        self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
 
551
        self.assertEqual(['0\n', '2\n', '4\n'], results)
592
552
 
593
553
    def test_readline_and_read(self):
594
554
        """Test exercising one byte reads, readline, and then read again."""
595
555
        transport = self.get_transport()
596
 
        transport.put_bytes('sample', b'0\n2\n4\n')
597
 
        f = pack.ReadVFile(transport.readv('sample', [(0, 6)]))
 
556
        transport.put_bytes('sample', '0\n2\n4\n')
 
557
        f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
598
558
        results = []
599
559
        results.append(f.read(1))
600
560
        results.append(f.readline())
601
561
        results.append(f.read(4))
602
 
        self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
 
562
        self.assertEqual(['0', '\n', '2\n4\n'], results)
603
563
 
604
564
 
605
565
class PushParserTestCase(tests.TestCase):
607
567
 
608
568
    def make_parser_expecting_record_type(self):
609
569
        parser = pack.ContainerPushParser()
610
 
        parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\n")
 
570
        parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
611
571
        return parser
612
572
 
613
573
    def make_parser_expecting_bytes_record(self):
614
574
        parser = pack.ContainerPushParser()
615
 
        parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\nB")
 
575
        parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
616
576
        return parser
617
577
 
618
 
    def assertRecordParsing(self, expected_record, data):
 
578
    def assertRecordParsing(self, expected_record, bytes):
619
579
        """Assert that 'bytes' is parsed as a given bytes record.
620
580
 
621
581
        :param expected_record: A tuple of (names, bytes).
622
582
        """
623
583
        parser = self.make_parser_expecting_bytes_record()
624
 
        parser.accept_bytes(data)
 
584
        parser.accept_bytes(bytes)
625
585
        parsed_records = parser.read_pending_records()
626
586
        self.assertEqual([expected_record], parsed_records)
627
587
 
645
605
        (A naive implementation might stop after parsing the first record.)
646
606
        """
647
607
        parser = self.make_parser_expecting_record_type()
648
 
        parser.accept_bytes(b"B5\nname1\n\nbody1B5\nname2\n\nbody2")
 
608
        parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
649
609
        self.assertEqual(
650
 
            [([(b'name1',)], b'body1'), ([(b'name2',)], b'body2')],
 
610
            [([('name1',)], 'body1'), ([('name2',)], 'body2')],
651
611
            parser.read_pending_records())
652
612
 
653
613
    def test_multiple_empty_records_at_once(self):
658
618
        record, because the buffer size had not changed.)
659
619
        """
660
620
        parser = self.make_parser_expecting_record_type()
661
 
        parser.accept_bytes(b"B0\nname1\n\nB0\nname2\n\n")
 
621
        parser.accept_bytes("B0\nname1\n\nB0\nname2\n\n")
662
622
        self.assertEqual(
663
 
            [([(b'name1',)], b''), ([(b'name2',)], b'')],
 
623
            [([('name1',)], ''), ([('name2',)], '')],
664
624
            parser.read_pending_records())
665
625
 
666
626
 
676
636
        """Reading a Bytes record with no name returns an empty list of
677
637
        names.
678
638
        """
679
 
        self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
 
639
        self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
680
640
 
681
641
    def test_record_with_one_name(self):
682
642
        """Reading a Bytes record with one name returns a list of just that
683
643
        name.
684
644
        """
685
645
        self.assertRecordParsing(
686
 
            ([(b'name1', )], b'aaaaa'),
687
 
            b"5\nname1\n\naaaaa")
 
646
            ([('name1', )], 'aaaaa'),
 
647
            "5\nname1\n\naaaaa")
688
648
 
689
649
    def test_record_with_two_names(self):
690
650
        """Reading a Bytes record with two names returns a list of both names.
691
651
        """
692
652
        self.assertRecordParsing(
693
 
            ([(b'name1', ), (b'name2', )], b'aaaaa'),
694
 
            b"5\nname1\nname2\n\naaaaa")
 
653
            ([('name1', ), ('name2', )], 'aaaaa'),
 
654
            "5\nname1\nname2\n\naaaaa")
695
655
 
696
656
    def test_record_with_two_part_names(self):
697
657
        """Reading a Bytes record with a two_part name reads both."""
698
658
        self.assertRecordParsing(
699
 
            ([(b'name1', b'name2')], b'aaaaa'),
700
 
            b"5\nname1\x00name2\n\naaaaa")
 
659
            ([('name1', 'name2')], 'aaaaa'),
 
660
            "5\nname1\x00name2\n\naaaaa")
701
661
 
702
662
    def test_invalid_length(self):
703
663
        """If the length-prefix is not a number, parsing raises
705
665
        """
706
666
        parser = self.make_parser_expecting_bytes_record()
707
667
        self.assertRaises(
708
 
            errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
 
668
            errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
709
669
 
710
670
    def test_incomplete_record(self):
711
671
        """If the bytes seen so far don't form a complete record, then there
712
672
        will be nothing returned by read_pending_records.
713
673
        """
714
674
        parser = self.make_parser_expecting_bytes_record()
715
 
        parser.accept_bytes(b"5\n\nabcd")
 
675
        parser.accept_bytes("5\n\nabcd")
716
676
        self.assertEqual([], parser.read_pending_records())
717
677
 
718
678
    def test_accept_nothing(self):
719
679
        """The edge case of parsing an empty string causes no error."""
720
680
        parser = self.make_parser_expecting_bytes_record()
721
 
        parser.accept_bytes(b"")
 
681
        parser.accept_bytes("")
722
682
 
723
 
    def assertInvalidRecord(self, data):
724
 
        """Assert that parsing the given bytes raises InvalidRecordError."""
 
683
    def assertInvalidRecord(self, bytes):
 
684
        """Assert that parsing the given bytes will raise an
 
685
        InvalidRecordError.
 
686
        """
725
687
        parser = self.make_parser_expecting_bytes_record()
726
688
        self.assertRaises(
727
 
            errors.InvalidRecordError, parser.accept_bytes, data)
 
689
            errors.InvalidRecordError, parser.accept_bytes, bytes)
728
690
 
729
691
    def test_read_invalid_name_whitespace(self):
730
692
        """Names must have no whitespace."""
731
693
        # A name with a space.
732
 
        self.assertInvalidRecord(b"0\nbad name\n\n")
 
694
        self.assertInvalidRecord("0\nbad name\n\n")
733
695
 
734
696
        # A name with a tab.
735
 
        self.assertInvalidRecord(b"0\nbad\tname\n\n")
 
697
        self.assertInvalidRecord("0\nbad\tname\n\n")
736
698
 
737
699
        # A name with a vertical tab.
738
 
        self.assertInvalidRecord(b"0\nbad\vname\n\n")
 
700
        self.assertInvalidRecord("0\nbad\vname\n\n")
739
701
 
740
702
    def test_repeated_read_pending_records(self):
741
703
        """read_pending_records will not return the same record twice."""
742
704
        parser = self.make_parser_expecting_bytes_record()
743
 
        parser.accept_bytes(b"6\n\nabcdef")
744
 
        self.assertEqual([([], b'abcdef')], parser.read_pending_records())
 
705
        parser.accept_bytes("6\n\nabcdef")
 
706
        self.assertEqual([([], 'abcdef')], parser.read_pending_records())
745
707
        self.assertEqual([], parser.read_pending_records())
 
708
 
 
709