/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_pack.py

  • Committer: John Arbash Meinel
  • Date: 2006-04-25 15:05:42 UTC
  • mfrom: (1185.85.85 bzr-encoding)
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060425150542-c7b518dca9928691
[merge] the old bzr-encoding changes, reparenting them on bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007, 2009, 2011, 2012, 2016 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for breezy.pack."""
18
 
 
19
 
from .. import errors, tests
20
 
from ..bzr import (
21
 
    pack,
22
 
    )
23
 
from ..sixish import (
24
 
    BytesIO,
25
 
    )
26
 
 
27
 
 
28
 
class TestContainerSerialiser(tests.TestCase):
29
 
    """Tests for the ContainerSerialiser class."""
30
 
 
31
 
    def test_construct(self):
32
 
        """Test constructing a ContainerSerialiser."""
33
 
        pack.ContainerSerialiser()
34
 
 
35
 
    def test_begin(self):
36
 
        serialiser = pack.ContainerSerialiser()
37
 
        self.assertEqual(b'Bazaar pack format 1 (introduced in 0.18)\n',
38
 
                         serialiser.begin())
39
 
 
40
 
    def test_end(self):
41
 
        serialiser = pack.ContainerSerialiser()
42
 
        self.assertEqual(b'E', serialiser.end())
43
 
 
44
 
    def test_bytes_record_no_name(self):
45
 
        serialiser = pack.ContainerSerialiser()
46
 
        record = serialiser.bytes_record(b'bytes', [])
47
 
        self.assertEqual(b'B5\n\nbytes', record)
48
 
 
49
 
    def test_bytes_record_one_name_with_one_part(self):
50
 
        serialiser = pack.ContainerSerialiser()
51
 
        record = serialiser.bytes_record(b'bytes', [(b'name',)])
52
 
        self.assertEqual(b'B5\nname\n\nbytes', record)
53
 
 
54
 
    def test_bytes_record_one_name_with_two_parts(self):
55
 
        serialiser = pack.ContainerSerialiser()
56
 
        record = serialiser.bytes_record(b'bytes', [(b'part1', b'part2')])
57
 
        self.assertEqual(b'B5\npart1\x00part2\n\nbytes', record)
58
 
 
59
 
    def test_bytes_record_two_names(self):
60
 
        serialiser = pack.ContainerSerialiser()
61
 
        record = serialiser.bytes_record(b'bytes', [(b'name1',), (b'name2',)])
62
 
        self.assertEqual(b'B5\nname1\nname2\n\nbytes', record)
63
 
 
64
 
    def test_bytes_record_whitespace_in_name_part(self):
65
 
        serialiser = pack.ContainerSerialiser()
66
 
        self.assertRaises(
67
 
            errors.InvalidRecordError,
68
 
            serialiser.bytes_record, b'bytes', [(b'bad name',)])
69
 
 
70
 
    def test_bytes_record_header(self):
71
 
        serialiser = pack.ContainerSerialiser()
72
 
        record = serialiser.bytes_header(32, [(b'name1',), (b'name2',)])
73
 
        self.assertEqual(b'B32\nname1\nname2\n\n', record)
74
 
 
75
 
 
76
 
class TestContainerWriter(tests.TestCase):
77
 
 
78
 
    def setUp(self):
79
 
        super(TestContainerWriter, self).setUp()
80
 
        self.output = BytesIO()
81
 
        self.writer = pack.ContainerWriter(self.output.write)
82
 
 
83
 
    def assertOutput(self, expected_output):
84
 
        """Assert that the output of self.writer ContainerWriter is equal to
85
 
        expected_output.
86
 
        """
87
 
        self.assertEqual(expected_output, self.output.getvalue())
88
 
 
89
 
    def test_construct(self):
90
 
        """Test constructing a ContainerWriter.
91
 
 
92
 
        This uses None as the output stream to show that the constructor
93
 
        doesn't try to use the output stream.
94
 
        """
95
 
        pack.ContainerWriter(None)
96
 
 
97
 
    def test_begin(self):
98
 
        """The begin() method writes the container format marker line."""
99
 
        self.writer.begin()
100
 
        self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\n')
101
 
 
102
 
    def test_zero_records_written_after_begin(self):
103
 
        """After begin is written, 0 records have been written."""
104
 
        self.writer.begin()
105
 
        self.assertEqual(0, self.writer.records_written)
106
 
 
107
 
    def test_end(self):
108
 
        """The end() method writes an End Marker record."""
109
 
        self.writer.begin()
110
 
        self.writer.end()
111
 
        self.assertOutput(b'Bazaar pack format 1 (introduced in 0.18)\nE')
112
 
 
113
 
    def test_empty_end_does_not_add_a_record_to_records_written(self):
114
 
        """The end() method does not count towards the records written."""
115
 
        self.writer.begin()
116
 
        self.writer.end()
117
 
        self.assertEqual(0, self.writer.records_written)
118
 
 
119
 
    def test_non_empty_end_does_not_add_a_record_to_records_written(self):
120
 
        """The end() method does not count towards the records written."""
121
 
        self.writer.begin()
122
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
123
 
        self.writer.end()
124
 
        self.assertEqual(1, self.writer.records_written)
125
 
 
126
 
    def test_add_bytes_record_no_name(self):
127
 
        """Add a bytes record with no name."""
128
 
        self.writer.begin()
129
 
        offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
130
 
        self.assertEqual((42, 7), (offset, length))
131
 
        self.assertOutput(
132
 
            b'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
133
 
 
134
 
    def test_add_bytes_record_one_name(self):
135
 
        """Add a bytes record with one name."""
136
 
        self.writer.begin()
137
 
 
138
 
        offset, length = self.writer.add_bytes_record(
139
 
            [b'abc'], len(b'abc'), names=[(b'name1', )])
140
 
        self.assertEqual((42, 13), (offset, length))
141
 
        self.assertOutput(
142
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
143
 
            b'B3\nname1\n\nabc')
144
 
 
145
 
    def test_add_bytes_record_split_writes(self):
146
 
        """Write a large record which does multiple IOs"""
147
 
 
148
 
        writes = []
149
 
        real_write = self.writer.write_func
150
 
 
151
 
        def record_writes(data):
152
 
            writes.append(data)
153
 
            return real_write(data)
154
 
 
155
 
        self.writer.write_func = record_writes
156
 
        self.writer._JOIN_WRITES_THRESHOLD = 2
157
 
 
158
 
        self.writer.begin()
159
 
        offset, length = self.writer.add_bytes_record(
160
 
            [b'abcabc'], len(b'abcabc'), names=[(b'name1', )])
161
 
        self.assertEqual((42, 16), (offset, length))
162
 
        self.assertOutput(
163
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
164
 
            b'B6\nname1\n\nabcabc')
165
 
 
166
 
        self.assertEqual([
167
 
            b'Bazaar pack format 1 (introduced in 0.18)\n',
168
 
            b'B6\nname1\n\n',
169
 
            b'abcabc'],
170
 
            writes)
171
 
 
172
 
    def test_add_bytes_record_two_names(self):
173
 
        """Add a bytes record with two names."""
174
 
        self.writer.begin()
175
 
        offset, length = self.writer.add_bytes_record(
176
 
            [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
177
 
        self.assertEqual((42, 19), (offset, length))
178
 
        self.assertOutput(
179
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
180
 
            b'B3\nname1\nname2\n\nabc')
181
 
 
182
 
    def test_add_bytes_record_two_names(self):
183
 
        """Add a bytes record with two names."""
184
 
        self.writer.begin()
185
 
        offset, length = self.writer.add_bytes_record(
186
 
            [b'abc'], len(b'abc'), names=[(b'name1', ), (b'name2', )])
187
 
        self.assertEqual((42, 19), (offset, length))
188
 
        self.assertOutput(
189
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
190
 
            b'B3\nname1\nname2\n\nabc')
191
 
 
192
 
    def test_add_bytes_record_two_element_name(self):
193
 
        """Add a bytes record with a two-element name."""
194
 
        self.writer.begin()
195
 
        offset, length = self.writer.add_bytes_record(
196
 
            [b'abc'], len(b'abc'), names=[(b'name1', b'name2')])
197
 
        self.assertEqual((42, 19), (offset, length))
198
 
        self.assertOutput(
199
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
200
 
            b'B3\nname1\x00name2\n\nabc')
201
 
 
202
 
    def test_add_second_bytes_record_gets_higher_offset(self):
203
 
        self.writer.begin()
204
 
        self.writer.add_bytes_record([b'a', b'bc'], len(b'abc'), names=[])
205
 
        offset, length = self.writer.add_bytes_record([b'abc'], len(b'abc'), names=[])
206
 
        self.assertEqual((49, 7), (offset, length))
207
 
        self.assertOutput(
208
 
            b'Bazaar pack format 1 (introduced in 0.18)\n'
209
 
            b'B3\n\nabc'
210
 
            b'B3\n\nabc')
211
 
 
212
 
    def test_add_bytes_record_invalid_name(self):
213
 
        """Adding a Bytes record with a name with whitespace in it raises
214
 
        InvalidRecordError.
215
 
        """
216
 
        self.writer.begin()
217
 
        self.assertRaises(
218
 
            errors.InvalidRecordError,
219
 
            self.writer.add_bytes_record, [b'abc'], len(b'abc'), names=[(b'bad name', )])
220
 
 
221
 
    def test_add_bytes_records_add_to_records_written(self):
222
 
        """Adding a Bytes record increments the records_written counter."""
223
 
        self.writer.begin()
224
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
225
 
        self.assertEqual(1, self.writer.records_written)
226
 
        self.writer.add_bytes_record([b'foo'], len(b'foo'), names=[])
227
 
        self.assertEqual(2, self.writer.records_written)
228
 
 
229
 
 
230
 
class TestContainerReader(tests.TestCase):
231
 
    """Tests for the ContainerReader.
232
 
 
233
 
    The ContainerReader reads format 1 containers, so these tests explicitly
234
 
    test how it reacts to format 1 data.  If a new version of the format is
235
 
    added, then separate tests for that format should be added.
236
 
    """
237
 
 
238
 
    def get_reader_for(self, data):
239
 
        stream = BytesIO(data)
240
 
        reader = pack.ContainerReader(stream)
241
 
        return reader
242
 
 
243
 
    def test_construct(self):
244
 
        """Test constructing a ContainerReader.
245
 
 
246
 
        This uses None as the output stream to show that the constructor
247
 
        doesn't try to use the input stream.
248
 
        """
249
 
        pack.ContainerReader(None)
250
 
 
251
 
    def test_empty_container(self):
252
 
        """Read an empty container."""
253
 
        reader = self.get_reader_for(
254
 
            b"Bazaar pack format 1 (introduced in 0.18)\nE")
255
 
        self.assertEqual([], list(reader.iter_records()))
256
 
 
257
 
    def test_unknown_format(self):
258
 
        """Unrecognised container formats raise UnknownContainerFormatError."""
259
 
        reader = self.get_reader_for(b"unknown format\n")
260
 
        self.assertRaises(
261
 
            errors.UnknownContainerFormatError, reader.iter_records)
262
 
 
263
 
    def test_unexpected_end_of_container(self):
264
 
        """Containers that don't end with an End Marker record should cause
265
 
        UnexpectedEndOfContainerError to be raised.
266
 
        """
267
 
        reader = self.get_reader_for(
268
 
            b"Bazaar pack format 1 (introduced in 0.18)\n")
269
 
        iterator = reader.iter_records()
270
 
        self.assertRaises(
271
 
            errors.UnexpectedEndOfContainerError, next, iterator)
272
 
 
273
 
    def test_unknown_record_type(self):
274
 
        """Unknown record types cause UnknownRecordTypeError to be raised."""
275
 
        reader = self.get_reader_for(
276
 
            b"Bazaar pack format 1 (introduced in 0.18)\nX")
277
 
        iterator = reader.iter_records()
278
 
        self.assertRaises(
279
 
            errors.UnknownRecordTypeError, next, iterator)
280
 
 
281
 
    def test_container_with_one_unnamed_record(self):
282
 
        """Read a container with one Bytes record.
283
 
 
284
 
        Parsing Bytes records is more thoroughly exercised by
285
 
        TestBytesRecordReader.  This test is here to ensure that
286
 
        ContainerReader's integration with BytesRecordReader is working.
287
 
        """
288
 
        reader = self.get_reader_for(
289
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
290
 
        expected_records = [([], b'aaaaa')]
291
 
        self.assertEqual(
292
 
            expected_records,
293
 
            [(names, read_bytes(None))
294
 
             for (names, read_bytes) in reader.iter_records()])
295
 
 
296
 
    def test_validate_empty_container(self):
297
 
        """validate does not raise an error for a container with no records."""
298
 
        reader = self.get_reader_for(
299
 
            b"Bazaar pack format 1 (introduced in 0.18)\nE")
300
 
        # No exception raised
301
 
        reader.validate()
302
 
 
303
 
    def test_validate_non_empty_valid_container(self):
304
 
        """validate does not raise an error for a container with a valid record.
305
 
        """
306
 
        reader = self.get_reader_for(
307
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
308
 
        # No exception raised
309
 
        reader.validate()
310
 
 
311
 
    def test_validate_bad_format(self):
312
 
        """validate raises an error for unrecognised format strings.
313
 
 
314
 
        It may raise either UnexpectedEndOfContainerError or
315
 
        UnknownContainerFormatError, depending on exactly what the string is.
316
 
        """
317
 
        inputs = [
318
 
            b"", b"x", b"Bazaar pack format 1 (introduced in 0.18)", b"bad\n"]
319
 
        for input in inputs:
320
 
            reader = self.get_reader_for(input)
321
 
            self.assertRaises(
322
 
                (errors.UnexpectedEndOfContainerError,
323
 
                 errors.UnknownContainerFormatError),
324
 
                reader.validate)
325
 
 
326
 
    def test_validate_bad_record_marker(self):
327
 
        """validate raises UnknownRecordTypeError for unrecognised record
328
 
        types.
329
 
        """
330
 
        reader = self.get_reader_for(
331
 
            b"Bazaar pack format 1 (introduced in 0.18)\nX")
332
 
        self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
333
 
 
334
 
    def test_validate_data_after_end_marker(self):
335
 
        """validate raises ContainerHasExcessDataError if there are any bytes
336
 
        after the end of the container.
337
 
        """
338
 
        reader = self.get_reader_for(
339
 
            b"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
340
 
        self.assertRaises(
341
 
            errors.ContainerHasExcessDataError, reader.validate)
342
 
 
343
 
    def test_validate_no_end_marker(self):
344
 
        """validate raises UnexpectedEndOfContainerError if there's no end of
345
 
        container marker, even if the container up to this point has been
346
 
        valid.
347
 
        """
348
 
        reader = self.get_reader_for(
349
 
            b"Bazaar pack format 1 (introduced in 0.18)\n")
350
 
        self.assertRaises(
351
 
            errors.UnexpectedEndOfContainerError, reader.validate)
352
 
 
353
 
    def test_validate_duplicate_name(self):
354
 
        """validate raises DuplicateRecordNameError if the same name occurs
355
 
        multiple times in the container.
356
 
        """
357
 
        reader = self.get_reader_for(
358
 
            b"Bazaar pack format 1 (introduced in 0.18)\n"
359
 
            b"B0\nname\n\n"
360
 
            b"B0\nname\n\n"
361
 
            b"E")
362
 
        self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
363
 
 
364
 
    def test_validate_undecodeable_name(self):
365
 
        """Names that aren't valid UTF-8 cause validate to fail."""
366
 
        reader = self.get_reader_for(
367
 
            b"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
368
 
        self.assertRaises(errors.InvalidRecordError, reader.validate)
369
 
 
370
 
 
371
 
class TestBytesRecordReader(tests.TestCase):
372
 
    """Tests for reading and validating Bytes records with
373
 
    BytesRecordReader.
374
 
 
375
 
    Like TestContainerReader, this explicitly tests the reading of format 1
376
 
    data.  If a new version of the format is added, then a separate set of
377
 
    tests for reading that format should be added.
378
 
    """
379
 
 
380
 
    def get_reader_for(self, data):
381
 
        stream = BytesIO(data)
382
 
        reader = pack.BytesRecordReader(stream)
383
 
        return reader
384
 
 
385
 
    def test_record_with_no_name(self):
386
 
        """Reading a Bytes record with no name returns an empty list of
387
 
        names.
388
 
        """
389
 
        reader = self.get_reader_for(b"5\n\naaaaa")
390
 
        names, get_bytes = reader.read()
391
 
        self.assertEqual([], names)
392
 
        self.assertEqual(b'aaaaa', get_bytes(None))
393
 
 
394
 
    def test_record_with_one_name(self):
395
 
        """Reading a Bytes record with one name returns a list of just that
396
 
        name.
397
 
        """
398
 
        reader = self.get_reader_for(b"5\nname1\n\naaaaa")
399
 
        names, get_bytes = reader.read()
400
 
        self.assertEqual([(b'name1', )], names)
401
 
        self.assertEqual(b'aaaaa', get_bytes(None))
402
 
 
403
 
    def test_record_with_two_names(self):
404
 
        """Reading a Bytes record with two names returns a list of both names.
405
 
        """
406
 
        reader = self.get_reader_for(b"5\nname1\nname2\n\naaaaa")
407
 
        names, get_bytes = reader.read()
408
 
        self.assertEqual([(b'name1', ), (b'name2', )], names)
409
 
        self.assertEqual(b'aaaaa', get_bytes(None))
410
 
 
411
 
    def test_record_with_two_part_names(self):
412
 
        """Reading a Bytes record with a two_part name reads both."""
413
 
        reader = self.get_reader_for(b"5\nname1\x00name2\n\naaaaa")
414
 
        names, get_bytes = reader.read()
415
 
        self.assertEqual([(b'name1', b'name2', )], names)
416
 
        self.assertEqual(b'aaaaa', get_bytes(None))
417
 
 
418
 
    def test_invalid_length(self):
419
 
        """If the length-prefix is not a number, parsing raises
420
 
        InvalidRecordError.
421
 
        """
422
 
        reader = self.get_reader_for(b"not a number\n")
423
 
        self.assertRaises(errors.InvalidRecordError, reader.read)
424
 
 
425
 
    def test_early_eof(self):
426
 
        """Tests for premature EOF occuring during parsing Bytes records with
427
 
        BytesRecordReader.
428
 
 
429
 
        A incomplete container might be interrupted at any point.  The
430
 
        BytesRecordReader needs to cope with the input stream running out no
431
 
        matter where it is in the parsing process.
432
 
 
433
 
        In all cases, UnexpectedEndOfContainerError should be raised.
434
 
        """
435
 
        complete_record = b"6\nname\n\nabcdef"
436
 
        for count in range(0, len(complete_record)):
437
 
            incomplete_record = complete_record[:count]
438
 
            reader = self.get_reader_for(incomplete_record)
439
 
            # We don't use assertRaises to make diagnosing failures easier
440
 
            # (assertRaises doesn't allow a custom failure message).
441
 
            try:
442
 
                names, read_bytes = reader.read()
443
 
                read_bytes(None)
444
 
            except errors.UnexpectedEndOfContainerError:
445
 
                pass
446
 
            else:
447
 
                self.fail(
448
 
                    "UnexpectedEndOfContainerError not raised when parsing %r"
449
 
                    % (incomplete_record,))
450
 
 
451
 
    def test_initial_eof(self):
452
 
        """EOF before any bytes read at all."""
453
 
        reader = self.get_reader_for(b"")
454
 
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
455
 
 
456
 
    def test_eof_after_length(self):
457
 
        """EOF after reading the length and before reading name(s)."""
458
 
        reader = self.get_reader_for(b"123\n")
459
 
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
460
 
 
461
 
    def test_eof_during_name(self):
462
 
        """EOF during reading a name."""
463
 
        reader = self.get_reader_for(b"123\nname")
464
 
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
465
 
 
466
 
    def test_read_invalid_name_whitespace(self):
467
 
        """Names must have no whitespace."""
468
 
        # A name with a space.
469
 
        reader = self.get_reader_for(b"0\nbad name\n\n")
470
 
        self.assertRaises(errors.InvalidRecordError, reader.read)
471
 
 
472
 
        # A name with a tab.
473
 
        reader = self.get_reader_for(b"0\nbad\tname\n\n")
474
 
        self.assertRaises(errors.InvalidRecordError, reader.read)
475
 
 
476
 
        # A name with a vertical tab.
477
 
        reader = self.get_reader_for(b"0\nbad\vname\n\n")
478
 
        self.assertRaises(errors.InvalidRecordError, reader.read)
479
 
 
480
 
    def test_validate_whitespace_in_name(self):
481
 
        """Names must have no whitespace."""
482
 
        reader = self.get_reader_for(b"0\nbad name\n\n")
483
 
        self.assertRaises(errors.InvalidRecordError, reader.validate)
484
 
 
485
 
    def test_validate_interrupted_prelude(self):
486
 
        """EOF during reading a record's prelude causes validate to fail."""
487
 
        reader = self.get_reader_for(b"")
488
 
        self.assertRaises(
489
 
            errors.UnexpectedEndOfContainerError, reader.validate)
490
 
 
491
 
    def test_validate_interrupted_body(self):
492
 
        """EOF during reading a record's body causes validate to fail."""
493
 
        reader = self.get_reader_for(b"1\n\n")
494
 
        self.assertRaises(
495
 
            errors.UnexpectedEndOfContainerError, reader.validate)
496
 
 
497
 
    def test_validate_unparseable_length(self):
498
 
        """An unparseable record length causes validate to fail."""
499
 
        reader = self.get_reader_for(b"\n\n")
500
 
        self.assertRaises(
501
 
            errors.InvalidRecordError, reader.validate)
502
 
 
503
 
    def test_validate_undecodeable_name(self):
504
 
        """Names that aren't valid UTF-8 cause validate to fail."""
505
 
        reader = self.get_reader_for(b"0\n\xcc\n\n")
506
 
        self.assertRaises(errors.InvalidRecordError, reader.validate)
507
 
 
508
 
    def test_read_max_length(self):
509
 
        """If the max_length passed to the callable returned by read is not
510
 
        None, then no more than that many bytes will be read.
511
 
        """
512
 
        reader = self.get_reader_for(b"6\n\nabcdef")
513
 
        names, get_bytes = reader.read()
514
 
        self.assertEqual(b'abc', get_bytes(3))
515
 
 
516
 
    def test_read_no_max_length(self):
517
 
        """If the max_length passed to the callable returned by read is None,
518
 
        then all the bytes in the record will be read.
519
 
        """
520
 
        reader = self.get_reader_for(b"6\n\nabcdef")
521
 
        names, get_bytes = reader.read()
522
 
        self.assertEqual(b'abcdef', get_bytes(None))
523
 
 
524
 
    def test_repeated_read_calls(self):
525
 
        """Repeated calls to the callable returned from BytesRecordReader.read
526
 
        will not read beyond the end of the record.
527
 
        """
528
 
        reader = self.get_reader_for(b"6\n\nabcdefB3\nnext-record\nXXX")
529
 
        names, get_bytes = reader.read()
530
 
        self.assertEqual(b'abcdef', get_bytes(None))
531
 
        self.assertEqual(b'', get_bytes(None))
532
 
        self.assertEqual(b'', get_bytes(99))
533
 
 
534
 
 
535
 
class TestMakeReadvReader(tests.TestCaseWithTransport):
536
 
 
537
 
    def test_read_skipping_records(self):
538
 
        pack_data = BytesIO()
539
 
        writer = pack.ContainerWriter(pack_data.write)
540
 
        writer.begin()
541
 
        memos = []
542
 
        memos.append(writer.add_bytes_record([b'abc'], 3, names=[]))
543
 
        memos.append(writer.add_bytes_record([b'def'], 3, names=[(b'name1', )]))
544
 
        memos.append(writer.add_bytes_record([b'ghi'], 3, names=[(b'name2', )]))
545
 
        memos.append(writer.add_bytes_record([b'jkl'], 3, names=[]))
546
 
        writer.end()
547
 
        transport = self.get_transport()
548
 
        transport.put_bytes('mypack', pack_data.getvalue())
549
 
        requested_records = [memos[0], memos[2]]
550
 
        reader = pack.make_readv_reader(transport, 'mypack', requested_records)
551
 
        result = []
552
 
        for names, reader_func in reader.iter_records():
553
 
            result.append((names, reader_func(None)))
554
 
        self.assertEqual([([], b'abc'), ([(b'name2', )], b'ghi')], result)
555
 
 
556
 
 
557
 
class TestReadvFile(tests.TestCaseWithTransport):
558
 
    """Tests of the ReadVFile class.
559
 
 
560
 
    Error cases are deliberately undefined: this code adapts the underlying
561
 
    transport interface to a single 'streaming read' interface as
562
 
    ContainerReader needs.
563
 
    """
564
 
 
565
 
    def test_read_bytes(self):
566
 
        """Test reading of both single bytes and all bytes in a hunk."""
567
 
        transport = self.get_transport()
568
 
        transport.put_bytes('sample', b'0123456789')
569
 
        f = pack.ReadVFile(transport.readv(
570
 
            'sample', [(0, 1), (1, 2), (4, 1), (6, 2)]))
571
 
        results = []
572
 
        results.append(f.read(1))
573
 
        results.append(f.read(2))
574
 
        results.append(f.read(1))
575
 
        results.append(f.read(1))
576
 
        results.append(f.read(1))
577
 
        self.assertEqual([b'0', b'12', b'4', b'6', b'7'], results)
578
 
 
579
 
    def test_readline(self):
580
 
        """Test using readline() as ContainerReader does.
581
 
 
582
 
        This is always within a readv hunk, never across it.
583
 
        """
584
 
        transport = self.get_transport()
585
 
        transport.put_bytes('sample', b'0\n2\n4\n')
586
 
        f = pack.ReadVFile(transport.readv('sample', [(0, 2), (2, 4)]))
587
 
        results = []
588
 
        results.append(f.readline())
589
 
        results.append(f.readline())
590
 
        results.append(f.readline())
591
 
        self.assertEqual([b'0\n', b'2\n', b'4\n'], results)
592
 
 
593
 
    def test_readline_and_read(self):
594
 
        """Test exercising one byte reads, readline, and then read again."""
595
 
        transport = self.get_transport()
596
 
        transport.put_bytes('sample', b'0\n2\n4\n')
597
 
        f = pack.ReadVFile(transport.readv('sample', [(0, 6)]))
598
 
        results = []
599
 
        results.append(f.read(1))
600
 
        results.append(f.readline())
601
 
        results.append(f.read(4))
602
 
        self.assertEqual([b'0', b'\n', b'2\n4\n'], results)
603
 
 
604
 
 
605
 
class PushParserTestCase(tests.TestCase):
606
 
    """Base class for TestCases involving ContainerPushParser."""
607
 
 
608
 
    def make_parser_expecting_record_type(self):
609
 
        parser = pack.ContainerPushParser()
610
 
        parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\n")
611
 
        return parser
612
 
 
613
 
    def make_parser_expecting_bytes_record(self):
614
 
        parser = pack.ContainerPushParser()
615
 
        parser.accept_bytes(b"Bazaar pack format 1 (introduced in 0.18)\nB")
616
 
        return parser
617
 
 
618
 
    def assertRecordParsing(self, expected_record, data):
619
 
        """Assert that 'bytes' is parsed as a given bytes record.
620
 
 
621
 
        :param expected_record: A tuple of (names, bytes).
622
 
        """
623
 
        parser = self.make_parser_expecting_bytes_record()
624
 
        parser.accept_bytes(data)
625
 
        parsed_records = parser.read_pending_records()
626
 
        self.assertEqual([expected_record], parsed_records)
627
 
 
628
 
 
629
 
class TestContainerPushParser(PushParserTestCase):
630
 
    """Tests for ContainerPushParser.
631
 
 
632
 
    The ContainerPushParser reads format 1 containers, so these tests
633
 
    explicitly test how it reacts to format 1 data.  If a new version of the
634
 
    format is added, then separate tests for that format should be added.
635
 
    """
636
 
 
637
 
    def test_construct(self):
638
 
        """ContainerPushParser can be constructed."""
639
 
        pack.ContainerPushParser()
640
 
 
641
 
    def test_multiple_records_at_once(self):
642
 
        """If multiple records worth of data are fed to the parser in one
643
 
        string, the parser will correctly parse all the records.
644
 
 
645
 
        (A naive implementation might stop after parsing the first record.)
646
 
        """
647
 
        parser = self.make_parser_expecting_record_type()
648
 
        parser.accept_bytes(b"B5\nname1\n\nbody1B5\nname2\n\nbody2")
649
 
        self.assertEqual(
650
 
            [([(b'name1',)], b'body1'), ([(b'name2',)], b'body2')],
651
 
            parser.read_pending_records())
652
 
 
653
 
    def test_multiple_empty_records_at_once(self):
654
 
        """If multiple empty records worth of data are fed to the parser in one
655
 
        string, the parser will correctly parse all the records.
656
 
 
657
 
        (A naive implementation might stop after parsing the first empty
658
 
        record, because the buffer size had not changed.)
659
 
        """
660
 
        parser = self.make_parser_expecting_record_type()
661
 
        parser.accept_bytes(b"B0\nname1\n\nB0\nname2\n\n")
662
 
        self.assertEqual(
663
 
            [([(b'name1',)], b''), ([(b'name2',)], b'')],
664
 
            parser.read_pending_records())
665
 
 
666
 
 
667
 
class TestContainerPushParserBytesParsing(PushParserTestCase):
668
 
    """Tests for reading Bytes records with ContainerPushParser.
669
 
 
670
 
    The ContainerPushParser reads format 1 containers, so these tests
671
 
    explicitly test how it reacts to format 1 data.  If a new version of the
672
 
    format is added, then separate tests for that format should be added.
673
 
    """
674
 
 
675
 
    def test_record_with_no_name(self):
676
 
        """Reading a Bytes record with no name returns an empty list of
677
 
        names.
678
 
        """
679
 
        self.assertRecordParsing(([], b'aaaaa'), b"5\n\naaaaa")
680
 
 
681
 
    def test_record_with_one_name(self):
682
 
        """Reading a Bytes record with one name returns a list of just that
683
 
        name.
684
 
        """
685
 
        self.assertRecordParsing(
686
 
            ([(b'name1', )], b'aaaaa'),
687
 
            b"5\nname1\n\naaaaa")
688
 
 
689
 
    def test_record_with_two_names(self):
690
 
        """Reading a Bytes record with two names returns a list of both names.
691
 
        """
692
 
        self.assertRecordParsing(
693
 
            ([(b'name1', ), (b'name2', )], b'aaaaa'),
694
 
            b"5\nname1\nname2\n\naaaaa")
695
 
 
696
 
    def test_record_with_two_part_names(self):
697
 
        """Reading a Bytes record with a two_part name reads both."""
698
 
        self.assertRecordParsing(
699
 
            ([(b'name1', b'name2')], b'aaaaa'),
700
 
            b"5\nname1\x00name2\n\naaaaa")
701
 
 
702
 
    def test_invalid_length(self):
703
 
        """If the length-prefix is not a number, parsing raises
704
 
        InvalidRecordError.
705
 
        """
706
 
        parser = self.make_parser_expecting_bytes_record()
707
 
        self.assertRaises(
708
 
            errors.InvalidRecordError, parser.accept_bytes, b"not a number\n")
709
 
 
710
 
    def test_incomplete_record(self):
711
 
        """If the bytes seen so far don't form a complete record, then there
712
 
        will be nothing returned by read_pending_records.
713
 
        """
714
 
        parser = self.make_parser_expecting_bytes_record()
715
 
        parser.accept_bytes(b"5\n\nabcd")
716
 
        self.assertEqual([], parser.read_pending_records())
717
 
 
718
 
    def test_accept_nothing(self):
719
 
        """The edge case of parsing an empty string causes no error."""
720
 
        parser = self.make_parser_expecting_bytes_record()
721
 
        parser.accept_bytes(b"")
722
 
 
723
 
    def assertInvalidRecord(self, data):
724
 
        """Assert that parsing the given bytes raises InvalidRecordError."""
725
 
        parser = self.make_parser_expecting_bytes_record()
726
 
        self.assertRaises(
727
 
            errors.InvalidRecordError, parser.accept_bytes, data)
728
 
 
729
 
    def test_read_invalid_name_whitespace(self):
730
 
        """Names must have no whitespace."""
731
 
        # A name with a space.
732
 
        self.assertInvalidRecord(b"0\nbad name\n\n")
733
 
 
734
 
        # A name with a tab.
735
 
        self.assertInvalidRecord(b"0\nbad\tname\n\n")
736
 
 
737
 
        # A name with a vertical tab.
738
 
        self.assertInvalidRecord(b"0\nbad\vname\n\n")
739
 
 
740
 
    def test_repeated_read_pending_records(self):
741
 
        """read_pending_records will not return the same record twice."""
742
 
        parser = self.make_parser_expecting_bytes_record()
743
 
        parser.accept_bytes(b"6\n\nabcdef")
744
 
        self.assertEqual([([], b'abcdef')], parser.read_pending_records())
745
 
        self.assertEqual([], parser.read_pending_records())