/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_pack.py

  • Committer: Aaron Bentley
  • Date: 2007-08-14 23:35:48 UTC
  • mfrom: (2698 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2699.
  • Revision ID: aaron.bentley@utoronto.ca-20070814233548-ctlr8sb1lcufb3ny
Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
54
54
        output = StringIO()
55
55
        writer = pack.ContainerWriter(output.write)
56
56
        writer.begin()
57
 
        writer.add_bytes_record('abc', names=[])
 
57
        offset, length = writer.add_bytes_record('abc', names=[])
 
58
        self.assertEqual((42, 7), (offset, length))
58
59
        self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc',
59
60
                         output.getvalue())
60
61
 
63
64
        output = StringIO()
64
65
        writer = pack.ContainerWriter(output.write)
65
66
        writer.begin()
66
 
        writer.add_bytes_record('abc', names=['name1'])
 
67
        offset, length = writer.add_bytes_record('abc', names=[('name1', )])
 
68
        self.assertEqual((42, 13), (offset, length))
67
69
        self.assertEqual(
68
70
            'Bazaar pack format 1 (introduced in 0.18)\n'
69
71
            'B3\nname1\n\nabc',
74
76
        output = StringIO()
75
77
        writer = pack.ContainerWriter(output.write)
76
78
        writer.begin()
77
 
        writer.add_bytes_record('abc', names=['name1', 'name2'])
78
 
        self.assertEqual(
79
 
            'Bazaar pack format 1 (introduced in 0.18)\n'
80
 
            'B3\nname1\nname2\n\nabc',
 
79
        offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
 
80
        self.assertEqual((42, 19), (offset, length))
 
81
        self.assertEqual(
 
82
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
83
            'B3\nname1\nname2\n\nabc',
 
84
            output.getvalue())
 
85
 
 
86
    def test_add_bytes_record_two_names(self):
 
87
        """Add a bytes record with two names."""
 
88
        output = StringIO()
 
89
        writer = pack.ContainerWriter(output.write)
 
90
        writer.begin()
 
91
        offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
 
92
        self.assertEqual((42, 19), (offset, length))
 
93
        self.assertEqual(
 
94
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
95
            'B3\nname1\nname2\n\nabc',
 
96
            output.getvalue())
 
97
 
 
98
    def test_add_bytes_record_two_element_name(self):
 
99
        """Add a bytes record with a two-element name."""
 
100
        output = StringIO()
 
101
        writer = pack.ContainerWriter(output.write)
 
102
        writer.begin()
 
103
        offset, length = writer.add_bytes_record('abc', names=[('name1', 'name2')])
 
104
        self.assertEqual((42, 19), (offset, length))
 
105
        self.assertEqual(
 
106
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
107
            'B3\nname1\x00name2\n\nabc',
 
108
            output.getvalue())
 
109
 
 
110
    def test_add_second_bytes_record_gets_higher_offset(self):
 
111
        output = StringIO()
 
112
        writer = pack.ContainerWriter(output.write)
 
113
        writer.begin()
 
114
        writer.add_bytes_record('abc', names=[])
 
115
        offset, length = writer.add_bytes_record('abc', names=[])
 
116
        self.assertEqual((49, 7), (offset, length))
 
117
        self.assertEqual(
 
118
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
119
            'B3\n\nabc'
 
120
            'B3\n\nabc',
81
121
            output.getvalue())
82
122
 
83
123
    def test_add_bytes_record_invalid_name(self):
89
129
        writer.begin()
90
130
        self.assertRaises(
91
131
            errors.InvalidRecordError,
92
 
            writer.add_bytes_record, 'abc', names=['bad name'])
 
132
            writer.add_bytes_record, 'abc', names=[('bad name', )])
93
133
 
94
134
 
95
135
class TestContainerReader(tests.TestCase):
247
287
        """
248
288
        reader = self.get_reader_for("5\nname1\n\naaaaa")
249
289
        names, get_bytes = reader.read()
250
 
        self.assertEqual(['name1'], names)
 
290
        self.assertEqual([('name1', )], names)
251
291
        self.assertEqual('aaaaa', get_bytes(None))
252
292
 
253
293
    def test_record_with_two_names(self):
255
295
        """
256
296
        reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
257
297
        names, get_bytes = reader.read()
258
 
        self.assertEqual(['name1', 'name2'], names)
 
298
        self.assertEqual([('name1', ), ('name2', )], names)
 
299
        self.assertEqual('aaaaa', get_bytes(None))
 
300
 
 
301
    def test_record_with_two_part_names(self):
 
302
        """Reading a Bytes record with a two_part name reads both."""
 
303
        reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
 
304
        names, get_bytes = reader.read()
 
305
        self.assertEqual([('name1', 'name2', )], names)
259
306
        self.assertEqual('aaaaa', get_bytes(None))
260
307
 
261
308
    def test_invalid_length(self):
375
422
        self.assertEqual('', get_bytes(99))
376
423
 
377
424
 
 
425
class TestMakeReadvReader(tests.TestCaseWithTransport):
 
426
 
 
427
    def test_read_skipping_records(self):
 
428
        pack_data = StringIO()
 
429
        writer = pack.ContainerWriter(pack_data.write)
 
430
        writer.begin()
 
431
        memos = []
 
432
        memos.append(writer.add_bytes_record('abc', names=[]))
 
433
        memos.append(writer.add_bytes_record('def', names=[('name1', )]))
 
434
        memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
 
435
        memos.append(writer.add_bytes_record('jkl', names=[]))
 
436
        writer.end()
 
437
        transport = self.get_transport()
 
438
        transport.put_bytes('mypack', pack_data.getvalue())
 
439
        requested_records = [memos[0], memos[2]]
 
440
        reader = pack.make_readv_reader(transport, 'mypack', requested_records)
 
441
        result = []
 
442
        for names, reader_func in reader.iter_records():
 
443
            result.append((names, reader_func(None)))
 
444
        self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
 
445
 
 
446
 
 
447
class TestReadvFile(tests.TestCaseWithTransport):
 
448
    """Tests of the ReadVFile class.
 
449
 
 
450
    Error cases are deliberately undefined: this code adapts the underlying
 
451
    transport interface to a single 'streaming read' interface as 
 
452
    ContainerReader needs.
 
453
    """
 
454
 
 
455
    def test_read_bytes(self):
 
456
        """Test reading of both single bytes and all bytes in a hunk."""
 
457
        transport = self.get_transport()
 
458
        transport.put_bytes('sample', '0123456789')
 
459
        f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
 
460
        results = []
 
461
        results.append(f.read(1))
 
462
        results.append(f.read(2))
 
463
        results.append(f.read(1))
 
464
        results.append(f.read(1))
 
465
        results.append(f.read(1))
 
466
        self.assertEqual(['0', '12', '4', '6', '7'], results)
 
467
 
 
468
    def test_readline(self):
 
469
        """Test using readline() as ContainerReader does.
 
470
 
 
471
        This is always within a readv hunk, never across it.
 
472
        """
 
473
        transport = self.get_transport()
 
474
        transport.put_bytes('sample', '0\n2\n4\n')
 
475
        f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
 
476
        results = []
 
477
        results.append(f.readline())
 
478
        results.append(f.readline())
 
479
        results.append(f.readline())
 
480
        self.assertEqual(['0\n', '2\n', '4\n'], results)
 
481
 
 
482
    def test_readline_and_read(self):
 
483
        """Test exercising one byte reads, readline, and then read again."""
 
484
        transport = self.get_transport()
 
485
        transport.put_bytes('sample', '0\n2\n4\n')
 
486
        f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
 
487
        results = []
 
488
        results.append(f.read(1))
 
489
        results.append(f.readline())
 
490
        results.append(f.read(4))
 
491
        self.assertEqual(['0', '\n', '2\n4\n'], results)