1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
23
# [max_repack, buffer_full, repacks_with_space, min_compression,
24
# total_bytes_in, total_bytes_out, avg_comp,
25
# bytes_autopack, bytes_sync_packed, num_full_by_zsync]
26
_stats = [0, 0, 0, 999, 0, 0, 0, 0, 0, 0]
28
class ChunkWriter(object):
29
"""ChunkWriter allows writing of compressed data with a fixed size.
31
If less data is supplied than fills a chunk, the chunk is padded with
32
NULL bytes. If more data is supplied, then the writer packs as much
33
in as it can, but never splits any item it was given.
35
The algorithm for packing is open to improvement! Current it is:
36
- write the bytes given
37
- if the total seen bytes so far exceeds the chunk size, flush.
39
:cvar _max_repack: To fit the maximum number of entries into a node, we
40
will sometimes start over and compress the whole list to get tighter
41
packing. We get diminishing returns after a while, so this limits the
42
number of times we will try.
43
In testing, some values for bzr.dev::
45
repack time MB max full
53
zsync time MB repack max_z time w/ add_node
61
10 6.8 5.0 260 967 5.3
62
11 6.8 4.9 366 839 5.3
63
12 6.9 4.8 454 731 5.1
64
15 7.2 4.7 704 450 5.8
67
In testing, some values for mysql-unpacked::
70
repack time MB hit_max full
72
2 54.4 13.7 3467 0 35.4
73
20 67.0 13.4 0 3380 46.7
76
zsync time w/ add_node
77
0 47.7 116.5 0 29782 29.5
78
1 48.5 60.2 0 15356 27.8
79
2 48.1 42.4 0 10822 27.8
80
5 48.3 25.5 0 6491 26.8
81
6 48.0 23.2 13 5896 27.3
82
7 48.1 21.6 29 5451 27.5
83
8 48.1 20.3 52 5108 27.1
84
10 46.9 18.6 195 4526 29.4
85
11 48.8 18.0 421 4143 29.2
86
12 47.4 17.5 702 3738 28.0
87
15 49.6 16.5 1223 2969 28.9
88
20 48.9 15.7 2182 1810 29.6
95
def __init__(self, chunk_size, reserved=0):
96
"""Create a ChunkWriter to write chunk_size chunks.
98
:param chunk_size: The total byte count to emit at the end of the
100
:param reserved: How many bytes to allow for reserved data. reserved
101
data space can only be written to via the write_reserved method.
103
self.chunk_size = chunk_size
104
self.compressor = zlib.compressobj()
107
self.bytes_out_len = 0
108
self.compressed = None
110
# bytes that have been seen, but not included in a flush to out yet
111
self.unflushed_in_bytes = 0
114
self.done = False # We will accept no more bytes
115
self.unused_bytes = None
116
self.reserved_size = reserved
121
This returns the final compressed chunk, and either None, or the
122
bytes that did not fit in the chunk.
124
self.bytes_in = None # Free the data cached so far, we don't need it
125
out = self.compressor.flush(Z_FINISH)
126
self.bytes_list.append(out)
127
self.bytes_out_len += len(out)
128
if self.num_repack > 0 and self.bytes_out_len > 0:
129
comp = float(self.seen_bytes) / self.bytes_out_len
132
_stats[4] += self.seen_bytes
133
_stats[5] += self.bytes_out_len
134
_stats[6] = float(_stats[4]) / _stats[5]
136
if self._max_repack == 0 and self.num_repack == 1:
139
if self.bytes_out_len > self.chunk_size:
140
raise AssertionError('Somehow we ended up with too much'
141
' compressed data, %d > %d'
142
% (self.bytes_out_len, self.chunk_size))
143
nulls_needed = self.chunk_size - self.bytes_out_len
145
self.bytes_list.append("\x00" * nulls_needed)
146
return self.bytes_list, self.unused_bytes, nulls_needed
148
def _recompress_all_bytes_in(self, extra_bytes=None):
149
"""Recompress the current bytes_in, and optionally more.
151
:param extra_bytes: Optional, if supplied we will try to add it with
153
:return: (bytes_out, compressor, alt_compressed)
154
bytes_out is the compressed bytes returned from the compressor
155
compressor An object with everything packed in so far, and
157
alt_compressed If the compressor supports copy(), then this is a
158
snapshot just before extra_bytes is added.
159
It is (bytes_out, compressor) as well.
160
The idea is if you find you cannot fit the new
161
bytes, you don't have to start over.
162
And if you *can* you don't have to Z_SYNC_FLUSH
165
compressor = zlib.compressobj()
167
append = bytes_out.append
168
compress = compressor.compress
169
for accepted_bytes in self.bytes_in:
170
out = compress(accepted_bytes)
174
out = compress(extra_bytes)
175
out += compressor.flush(Z_SYNC_FLUSH)
177
bytes_out_len = sum(map(len, bytes_out))
178
return bytes_out, bytes_out_len, compressor
180
def write(self, bytes, reserved=False):
181
"""Write some bytes to the chunk.
183
If the bytes fit, False is returned. Otherwise True is returned
184
and the bytes have not been added to the chunk.
186
if self.num_repack > self._max_repack and not reserved:
187
self.unused_bytes = bytes
190
capacity = self.chunk_size
192
capacity = self.chunk_size - self.reserved_size
193
comp = self.compressor
194
# Check to see if the currently unflushed bytes would fit with a bit of
195
# room to spare, assuming no compression.
196
next_unflushed = self.unflushed_in_bytes + len(bytes)
197
remaining_capacity = capacity - self.bytes_out_len - 10
198
if (next_unflushed < remaining_capacity):
199
# Yes, just push it in, assuming it will fit
200
out = comp.compress(bytes)
202
self.bytes_list.append(out)
203
self.bytes_out_len += len(out)
204
self.bytes_in.append(bytes)
205
self.seen_bytes += len(bytes)
206
self.unflushed_in_bytes += len(bytes)
207
_stats[7] += 1 # len(bytes)
209
# This may or may not fit, try to add it with Z_SYNC_FLUSH
210
_stats[8] += 1 # len(bytes)
211
# Note: It is tempting to do this as a look-ahead pass, and to
212
# 'copy()' the compressor before flushing. However, it seems that
213
# 'flush()' is when the compressor actually does most work
214
# (consider it the real compression pass over the data-so-far).
215
# Which means that it is the same thing as increasing repack,
216
# similar cost, same benefit. And this way we still have the
217
# 'repack' knob that can be adjusted, and not depend on a
218
# platform-specific 'copy()' function.
220
if self._max_repack == 0 and self.num_zsync > self._max_zsync:
223
out = comp.compress(bytes)
224
out += comp.flush(Z_SYNC_FLUSH)
225
self.unflushed_in_bytes = 0
227
self.bytes_list.append(out)
228
self.bytes_out_len += len(out)
230
# We are a bit extra conservative, because it seems that you *can*
231
# get better compression with Z_SYNC_FLUSH than a full compress. It
232
# is probably very rare, but we were able to trigger it.
233
if self.num_repack == 0:
237
if self.bytes_out_len + safety_margin <= capacity:
238
# It fit, so mark it added
239
self.bytes_in.append(bytes)
240
self.seen_bytes += len(bytes)
242
# We are over budget, try to squeeze this in without any
245
(bytes_out, this_len,
246
compressor) = self._recompress_all_bytes_in(bytes)
247
if self.num_repack >= self._max_repack:
248
# When we get *to* _max_repack, bump over so that the
249
# earlier > _max_repack will be triggered.
252
if this_len + 10 > capacity:
253
(bytes_out, this_len,
254
compressor) = self._recompress_all_bytes_in()
256
self.compressor = compressor
257
# Force us to not allow more data
258
self.num_repack = self._max_repack + 1
259
self.bytes_list = bytes_out
260
self.bytes_out_len = this_len
261
self.unused_bytes = bytes
264
# This fits when we pack it tighter, so use the new packing
265
# There is one Z_SYNC_FLUSH call in
266
# _recompress_all_bytes_in
268
self.compressor = compressor
269
self.bytes_in.append(bytes)
270
self.bytes_list = bytes_out
271
self.bytes_out_len = this_len