22
from .. import chunk_writer
23
from . import TestCaseWithTransport
22
from bzrlib import chunk_writer
23
from bzrlib.tests import TestCaseWithTransport
26
26
class TestWriter(TestCaseWithTransport):
28
28
def check_chunk(self, bytes_list, size):
29
data = b''.join(bytes_list)
30
self.assertEqual(size, len(data))
31
return zlib.decompress(data)
29
bytes = ''.join(bytes_list)
30
self.assertEqual(size, len(bytes))
31
return zlib.decompress(bytes)
33
33
def test_chunk_writer_empty(self):
34
34
writer = chunk_writer.ChunkWriter(4096)
35
35
bytes_list, unused, padding = writer.finish()
36
36
node_bytes = self.check_chunk(bytes_list, 4096)
37
self.assertEqual(b"", node_bytes)
37
self.assertEqual("", node_bytes)
38
38
self.assertEqual(None, unused)
39
39
# Only a zlib header.
40
40
self.assertEqual(4088, padding)
60
60
def test_some_data(self):
61
61
writer = chunk_writer.ChunkWriter(4096)
62
writer.write(b"foo bar baz quux\n")
62
writer.write("foo bar baz quux\n")
63
63
bytes_list, unused, padding = writer.finish()
64
64
node_bytes = self.check_chunk(bytes_list, 4096)
65
self.assertEqual(b"foo bar baz quux\n", node_bytes)
65
self.assertEqual("foo bar baz quux\n", node_bytes)
66
66
self.assertEqual(None, unused)
67
67
# More than just the header..
68
68
self.assertEqual(4073, padding)
73
for group in range(48):
75
numbers = list(range(offset, offset + 50))
76
# Create a line with this group
77
lines.append(b''.join(b'%d' % n for n in numbers) + b'\n')
80
70
def test_too_much_data_does_not_exceed_size(self):
81
71
# Generate enough data to exceed 4K
82
lines = self._make_lines()
73
for group in range(48):
75
numbers = range(offset, offset + 50)
76
# Create a line with this group
77
lines.append(''.join(map(str, numbers)) + '\n')
83
78
writer = chunk_writer.ChunkWriter(4096)
84
79
for idx, line in enumerate(lines):
85
80
if writer.write(line):
88
83
bytes_list, unused, _ = writer.finish()
89
84
node_bytes = self.check_chunk(bytes_list, 4096)
90
85
# the first 46 lines should have been added
91
expected_bytes = b''.join(lines[:46])
86
expected_bytes = ''.join(lines[:46])
92
87
self.assertEqualDiff(expected_bytes, node_bytes)
93
88
# And the line that failed should have been saved for us
94
89
self.assertEqual(lines[46], unused)
96
91
def test_too_much_data_preserves_reserve_space(self):
97
92
# Generate enough data to exceed 4K
98
lines = self._make_lines()
94
for group in range(48):
96
numbers = range(offset, offset + 50)
97
# Create a line with this group
98
lines.append(''.join(map(str, numbers)) + '\n')
99
99
writer = chunk_writer.ChunkWriter(4096, 256)
100
100
for idx, line in enumerate(lines):
101
101
if writer.write(line):
105
105
self.fail('We were able to write all lines')
106
self.assertFalse(writer.write(b"A"*256, reserved=True))
106
self.assertFalse(writer.write("A"*256, reserved=True))
107
107
bytes_list, unused, _ = writer.finish()
108
108
node_bytes = self.check_chunk(bytes_list, 4096)
109
109
# the first 44 lines should have been added
110
expected_bytes = b''.join(lines[:44]) + b"A"*256
110
expected_bytes = ''.join(lines[:44]) + "A"*256
111
111
self.assertEqualDiff(expected_bytes, node_bytes)
112
112
# And the line that failed should have been saved for us
113
113
self.assertEqual(lines[44], unused)