/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_chunk_writer.py

  • Committer: Martin Pool
  • Date: 2007-10-03 08:06:44 UTC
  • mto: This revision was merged to the branch mainline in revision 2901.
  • Revision ID: mbp@sourcefrog.net-20071003080644-oivy0gkg98sex0ed
Avoid internal error tracebacks on failure to lock on readonly transport (#129701).

Add new LockFailed, which doesn't imply that we failed to get it because of
contention.  Raise this if we fail to create the pending or lock directories
because of Transport errors.

UnlockableTransport is not an internal error.

ReadOnlyLockError has a message which didn't match its name or usage; it's now
deprecated and callers are updated to use LockFailed which is more appropriate.

Add zero_ninetytwo deprecation symbol.

Unify assertMatchesRe with TestCase.assertContainsRe.

When the constructor is deprecated, just say that the class is deprecated, not
the __init__ method - this works better with applyDeprecated in tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
#
17
 
 
18
 
"""Tests for writing fixed size chunks with compression."""
19
 
 
20
 
import zlib
21
 
 
22
 
from bzrlib import chunk_writer
23
 
from bzrlib.tests import TestCaseWithTransport
24
 
 
25
 
 
26
 
class TestWriter(TestCaseWithTransport):
27
 
 
28
 
    def check_chunk(self, bytes_list, size):
29
 
        bytes = ''.join(bytes_list)
30
 
        self.assertEqual(size, len(bytes))
31
 
        return zlib.decompress(bytes)
32
 
 
33
 
    def test_chunk_writer_empty(self):
34
 
        writer = chunk_writer.ChunkWriter(4096)
35
 
        bytes_list, unused, padding = writer.finish()
36
 
        node_bytes = self.check_chunk(bytes_list, 4096)
37
 
        self.assertEqual("", node_bytes)
38
 
        self.assertEqual(None, unused)
39
 
        # Only a zlib header.
40
 
        self.assertEqual(4088, padding)
41
 
 
42
 
    def test_optimize_for_speed(self):
43
 
        writer = chunk_writer.ChunkWriter(4096)
44
 
        writer.set_optimize(for_size=False)
45
 
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_speed,
46
 
                         (writer._max_repack, writer._max_zsync))
47
 
        writer = chunk_writer.ChunkWriter(4096, optimize_for_size=False)
48
 
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_speed,
49
 
                         (writer._max_repack, writer._max_zsync))
50
 
 
51
 
    def test_optimize_for_size(self):
52
 
        writer = chunk_writer.ChunkWriter(4096)
53
 
        writer.set_optimize(for_size=True)
54
 
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_size,
55
 
                         (writer._max_repack, writer._max_zsync))
56
 
        writer = chunk_writer.ChunkWriter(4096, optimize_for_size=True)
57
 
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_size,
58
 
                         (writer._max_repack, writer._max_zsync))
59
 
 
60
 
    def test_some_data(self):
61
 
        writer = chunk_writer.ChunkWriter(4096)
62
 
        writer.write("foo bar baz quux\n")
63
 
        bytes_list, unused, padding = writer.finish()
64
 
        node_bytes = self.check_chunk(bytes_list, 4096)
65
 
        self.assertEqual("foo bar baz quux\n", node_bytes)
66
 
        self.assertEqual(None, unused)
67
 
        # More than just the header..
68
 
        self.assertEqual(4073, padding)
69
 
 
70
 
    def test_too_much_data_does_not_exceed_size(self):
71
 
        # Generate enough data to exceed 4K
72
 
        lines = []
73
 
        for group in range(48):
74
 
            offset = group * 50
75
 
            numbers = range(offset, offset + 50)
76
 
            # Create a line with this group
77
 
            lines.append(''.join(map(str, numbers)) + '\n')
78
 
        writer = chunk_writer.ChunkWriter(4096)
79
 
        for idx, line in enumerate(lines):
80
 
            if writer.write(line):
81
 
                self.assertEqual(46, idx)
82
 
                break
83
 
        bytes_list, unused, _ = writer.finish()
84
 
        node_bytes = self.check_chunk(bytes_list, 4096)
85
 
        # the first 46 lines should have been added
86
 
        expected_bytes = ''.join(lines[:46])
87
 
        self.assertEqualDiff(expected_bytes, node_bytes)
88
 
        # And the line that failed should have been saved for us
89
 
        self.assertEqual(lines[46], unused)
90
 
 
91
 
    def test_too_much_data_preserves_reserve_space(self):
92
 
        # Generate enough data to exceed 4K
93
 
        lines = []
94
 
        for group in range(48):
95
 
            offset = group * 50
96
 
            numbers = range(offset, offset + 50)
97
 
            # Create a line with this group
98
 
            lines.append(''.join(map(str, numbers)) + '\n')
99
 
        writer = chunk_writer.ChunkWriter(4096, 256)
100
 
        for idx, line in enumerate(lines):
101
 
            if writer.write(line):
102
 
                self.assertEqual(44, idx)
103
 
                break
104
 
        else:
105
 
            self.fail('We were able to write all lines')
106
 
        self.assertFalse(writer.write("A"*256, reserved=True))
107
 
        bytes_list, unused, _ = writer.finish()
108
 
        node_bytes = self.check_chunk(bytes_list, 4096)
109
 
        # the first 44 lines should have been added
110
 
        expected_bytes = ''.join(lines[:44]) + "A"*256
111
 
        self.assertEqualDiff(expected_bytes, node_bytes)
112
 
        # And the line that failed should have been saved for us
113
 
        self.assertEqual(lines[44], unused)