1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
24
class ChunkWriter(object):
25
"""ChunkWriter allows writing of compressed data with a fixed size.
27
If less data is supplied than fills a chunk, the chunk is padded with
28
NULL bytes. If more data is supplied, then the writer packs as much
29
in as it can, but never splits any item it was given.
31
The algorithm for packing is open to improvement! Current it is:
32
- write the bytes given
33
- if the total seen bytes so far exceeds the chunk size, flush.
35
:cvar _max_repack: To fit the maximum number of entries into a node, we
36
will sometimes start over and compress the whole list to get tighter
37
packing. We get diminishing returns after a while, so this limits the
38
number of times we will try.
39
In testing, some values for bzr.dev::
41
w/o copy w/ copy w/ copy ins w/ copy & save
42
repack time MB time MB time MB time MB
43
1 8.8 5.1 8.9 5.1 9.6 4.4 12.5 4.1
44
2 9.6 4.4 10.1 4.3 10.4 4.2 11.1 4.1
45
3 10.6 4.2 11.1 4.1 11.2 4.1 11.3 4.1
48
20 12.9 4.1 12.2 4.1 12.3 4.1
50
In testing, some values for mysql-unpacked::
52
w/o copy w/ copy w/ copy ins w/ copy & save
53
repack time MB time MB time MB time MB
55
2 59.3 14.1 62.6 13.5 64.3 13.4
59
:cvar _default_min_compression_size: The expected minimum compression.
60
While packing nodes into the page, we won't Z_SYNC_FLUSH until we have
61
received this much input data. This saves time, because we don't bloat
62
the result with SYNC entries (and then need to repack), but if it is
63
set too high we will accept data that will never fit and trigger a
68
_default_min_compression_size = 1.8
70
def __init__(self, chunk_size, reserved=0):
71
"""Create a ChunkWriter to write chunk_size chunks.
73
:param chunk_size: The total byte count to emit at the end of the
75
:param reserved: How many bytes to allow for reserved data. reserved
76
data space can only be written to via the write_reserved method.
78
self.chunk_size = chunk_size
79
self.compressor = zlib.compressobj()
82
self.bytes_out_len = 0
83
self.compressed = None
86
self.unused_bytes = None
87
self.reserved_size = reserved
88
self.min_compress_size = self._default_min_compression_size
93
This returns the final compressed chunk, and either None, or the
94
bytes that did not fit in the chunk.
96
self.bytes_in = None # Free the data cached so far, we don't need it
97
out = self.compressor.flush(Z_FINISH)
98
self.bytes_list.append(out)
99
self.bytes_out_len += len(out)
100
if self.bytes_out_len > self.chunk_size:
101
raise AssertionError('Somehow we ended up with too much'
102
' compressed data, %d > %d'
103
% (self.bytes_out_len, self.chunk_size))
104
nulls_needed = self.chunk_size - self.bytes_out_len % self.chunk_size
106
self.bytes_list.append("\x00" * nulls_needed)
107
return self.bytes_list, self.unused_bytes, nulls_needed
109
def _recompress_all_bytes_in(self, extra_bytes=None):
110
"""Recompress the current bytes_in, and optionally more.
112
:param extra_bytes: Optional, if supplied we will try to add it with
114
:return: (bytes_out, compressor, alt_compressed)
115
bytes_out is the compressed bytes returned from the compressor
116
compressor An object with everything packed in so far, and
118
alt_compressed If the compressor supports copy(), then this is a
119
snapshot just before extra_bytes is added.
120
It is (bytes_out, compressor) as well.
121
The idea is if you find you cannot fit the new
122
bytes, you don't have to start over.
123
And if you *can* you don't have to Z_SYNC_FLUSH
126
compressor = zlib.compressobj()
128
append = bytes_out.append
129
compress = compressor.compress
130
for accepted_bytes in self.bytes_in:
131
out = compress(accepted_bytes)
135
out = compress(extra_bytes)
136
out += compressor.flush(Z_SYNC_FLUSH)
139
bytes_out_len = sum(map(len, bytes_out))
140
return bytes_out, bytes_out_len, compressor
142
def write(self, bytes, reserved=False):
143
"""Write some bytes to the chunk.
145
If the bytes fit, False is returned. Otherwise True is returned
146
and the bytes have not been added to the chunk.
149
capacity = self.chunk_size
151
capacity = self.chunk_size - self.reserved_size
152
# Check quickly to see if this is likely to put us outside of our
154
next_seen_size = self.seen_bytes + len(bytes)
155
comp = self.compressor
156
if (next_seen_size < self.min_compress_size * capacity):
157
# No need, we assume this will "just fit"
158
out = comp.compress(bytes)
160
self.bytes_list.append(out)
161
self.bytes_out_len += len(out)
162
self.bytes_in.append(bytes)
163
self.seen_bytes = next_seen_size
165
if self.num_repack >= self._max_repack and not reserved:
166
# We already know we don't want to try to fit more
168
# This may or may not fit, try to add it with Z_SYNC_FLUSH
169
out = comp.compress(bytes)
170
out += comp.flush(Z_SYNC_FLUSH)
172
self.bytes_list.append(out)
173
self.bytes_out_len += len(out)
174
if self.bytes_out_len + 10 > capacity:
175
# We are over budget, try to squeeze this in without any
178
bytes_out, this_len, compressor = self._recompress_all_bytes_in(bytes)
179
if this_len + 10 > capacity:
180
# No way we can add anymore, we need to re-pack because our
181
# compressor is now out of sync.
182
# This seems to be rarely triggered over
183
# num_repack > _max_repack
184
bytes_out, this_len, compressor = self._recompress_all_bytes_in()
185
self.compressor = compressor
186
self.bytes_list = bytes_out
187
self.bytes_out_len = this_len
188
self.unused_bytes = bytes
191
# This fits when we pack it tighter, so use the new packing
192
# There is one Z_SYNC_FLUSH call in
193
# _recompress_all_bytes_in
194
self.compressor = compressor
195
self.bytes_in.append(bytes)
196
self.bytes_list = bytes_out
197
self.bytes_out_len = this_len
199
# It fit, so mark it added
200
self.bytes_in.append(bytes)
201
self.seen_bytes = next_seen_size