/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_chunk_writer.py

  • Committer: John Arbash Meinel
  • Date: 2009-12-22 16:28:47 UTC
  • mto: This revision was merged to the branch mainline in revision 4922.
  • Revision ID: john@arbash-meinel.com-20091222162847-tvnsc69to4l4uf5r
Implement a permute_for_extension helper.

Use it for all of the 'simple' extension permutations.
It basically permutes all tests in the current module, by setting TestCase.module.
Which works well for most of our extension tests. Some had more advanced
handling of permutations (extra permutations, custom vars, etc.)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
#
 
17
 
 
18
"""Tests for writing fixed size chunks with compression."""
 
19
 
 
20
import zlib
 
21
 
 
22
from bzrlib import chunk_writer
 
23
from bzrlib.tests import TestCaseWithTransport
 
24
 
 
25
 
 
26
class TestWriter(TestCaseWithTransport):
 
27
 
 
28
    def check_chunk(self, bytes_list, size):
 
29
        bytes = ''.join(bytes_list)
 
30
        self.assertEqual(size, len(bytes))
 
31
        return zlib.decompress(bytes)
 
32
 
 
33
    def test_chunk_writer_empty(self):
 
34
        writer = chunk_writer.ChunkWriter(4096)
 
35
        bytes_list, unused, padding = writer.finish()
 
36
        node_bytes = self.check_chunk(bytes_list, 4096)
 
37
        self.assertEqual("", node_bytes)
 
38
        self.assertEqual(None, unused)
 
39
        # Only a zlib header.
 
40
        self.assertEqual(4088, padding)
 
41
 
 
42
    def test_optimize_for_speed(self):
 
43
        writer = chunk_writer.ChunkWriter(4096)
 
44
        writer.set_optimize(for_size=False)
 
45
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_speed,
 
46
                         (writer._max_repack, writer._max_zsync))
 
47
        writer = chunk_writer.ChunkWriter(4096, optimize_for_size=False)
 
48
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_speed,
 
49
                         (writer._max_repack, writer._max_zsync))
 
50
 
 
51
    def test_optimize_for_size(self):
 
52
        writer = chunk_writer.ChunkWriter(4096)
 
53
        writer.set_optimize(for_size=True)
 
54
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_size,
 
55
                         (writer._max_repack, writer._max_zsync))
 
56
        writer = chunk_writer.ChunkWriter(4096, optimize_for_size=True)
 
57
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_size,
 
58
                         (writer._max_repack, writer._max_zsync))
 
59
 
 
60
    def test_some_data(self):
 
61
        writer = chunk_writer.ChunkWriter(4096)
 
62
        writer.write("foo bar baz quux\n")
 
63
        bytes_list, unused, padding = writer.finish()
 
64
        node_bytes = self.check_chunk(bytes_list, 4096)
 
65
        self.assertEqual("foo bar baz quux\n", node_bytes)
 
66
        self.assertEqual(None, unused)
 
67
        # More than just the header..
 
68
        self.assertEqual(4073, padding)
 
69
 
 
70
    def test_too_much_data_does_not_exceed_size(self):
 
71
        # Generate enough data to exceed 4K
 
72
        lines = []
 
73
        for group in range(48):
 
74
            offset = group * 50
 
75
            numbers = range(offset, offset + 50)
 
76
            # Create a line with this group
 
77
            lines.append(''.join(map(str, numbers)) + '\n')
 
78
        writer = chunk_writer.ChunkWriter(4096)
 
79
        for idx, line in enumerate(lines):
 
80
            if writer.write(line):
 
81
                self.assertEqual(46, idx)
 
82
                break
 
83
        bytes_list, unused, _ = writer.finish()
 
84
        node_bytes = self.check_chunk(bytes_list, 4096)
 
85
        # the first 46 lines should have been added
 
86
        expected_bytes = ''.join(lines[:46])
 
87
        self.assertEqualDiff(expected_bytes, node_bytes)
 
88
        # And the line that failed should have been saved for us
 
89
        self.assertEqual(lines[46], unused)
 
90
 
 
91
    def test_too_much_data_preserves_reserve_space(self):
 
92
        # Generate enough data to exceed 4K
 
93
        lines = []
 
94
        for group in range(48):
 
95
            offset = group * 50
 
96
            numbers = range(offset, offset + 50)
 
97
            # Create a line with this group
 
98
            lines.append(''.join(map(str, numbers)) + '\n')
 
99
        writer = chunk_writer.ChunkWriter(4096, 256)
 
100
        for idx, line in enumerate(lines):
 
101
            if writer.write(line):
 
102
                self.assertEqual(44, idx)
 
103
                break
 
104
        else:
 
105
            self.fail('We were able to write all lines')
 
106
        self.assertFalse(writer.write("A"*256, reserved=True))
 
107
        bytes_list, unused, _ = writer.finish()
 
108
        node_bytes = self.check_chunk(bytes_list, 4096)
 
109
        # the first 44 lines should have been added
 
110
        expected_bytes = ''.join(lines[:44]) + "A"*256
 
111
        self.assertEqualDiff(expected_bytes, node_bytes)
 
112
        # And the line that failed should have been saved for us
 
113
        self.assertEqual(lines[44], unused)