1
 
# Copyright (C) 2008 Canonical Ltd
 
3
 
# This program is free software; you can redistribute it and/or modify
 
4
 
# it under the terms of the GNU General Public License as published by
 
5
 
# the Free Software Foundation; either version 2 of the License, or
 
6
 
# (at your option) any later version.
 
8
 
# This program is distributed in the hope that it will be useful,
 
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
 
# GNU General Public License for more details.
 
13
 
# You should have received a copy of the GNU General Public License
 
14
 
# along with this program; if not, write to the Free Software
 
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
18
 
"""ChunkWriter: write compressed data out with a fixed upper bound."""
 
21
 
from zlib import Z_FINISH, Z_SYNC_FLUSH
 
24
 
class ChunkWriter(object):
 
25
 
    """ChunkWriter allows writing of compressed data with a fixed size.
 
27
 
    If less data is supplied than fills a chunk, the chunk is padded with
 
28
 
    NULL bytes. If more data is supplied, then the writer packs as much
 
29
 
    in as it can, but never splits any item it was given.
 
31
 
    The algorithm for packing is open to improvement! Current it is:
 
32
 
     - write the bytes given
 
33
 
     - if the total seen bytes so far exceeds the chunk size, flush.
 
35
 
    :cvar _max_repack: To fit the maximum number of entries into a node, we
 
36
 
        will sometimes start over and compress the whole list to get tighter
 
37
 
        packing. We get diminishing returns after a while, so this limits the
 
38
 
        number of times we will try.
 
39
 
        The default is to try to avoid recompressing entirely, but setting this
 
40
 
        to something like 20 will give maximum compression.
 
42
 
    :cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
 
43
 
        can limit the number of times we will try to pack more data into a
 
44
 
        node. This allows us to do a single compression pass, rather than
 
45
 
        trying until we overflow, and then recompressing again.
 
47
 
    #    In testing, some values for bzr.dev::
 
48
 
    #        repack  time  MB   max   full
 
55
 
    #        zsync   time  MB    repack  stop_for_z
 
69
 
    #    In testing, some values for mysql-unpacked::
 
71
 
    #        repack  time  MB    full    stop_for_repack
 
77
 
    #         0      29.5 116.5  0       29782
 
84
 
    #        10      29.4  18.6  195     4526
 
85
 
    #        11      29.2  18.0  421     4143
 
86
 
    #        12      28.0  17.5  702     3738
 
87
 
    #        15      28.9  16.5  1223    2969
 
88
 
    #        20      29.6  15.7  2182    1810
 
89
 
    #        30      31.4  15.4  3891    23
 
91
 
    # Tuple of (num_repack_attempts, num_zsync_attempts)
 
92
 
    # num_zsync_attempts only has meaning if num_repack_attempts is 0.
 
93
 
    _repack_opts_for_speed = (0, 8)
 
94
 
    _repack_opts_for_size = (20, 0)
 
96
 
    def __init__(self, chunk_size, reserved=0, optimize_for_size=False):
 
97
 
        """Create a ChunkWriter to write chunk_size chunks.
 
99
 
        :param chunk_size: The total byte count to emit at the end of the
 
101
 
        :param reserved: How many bytes to allow for reserved data. reserved
 
102
 
            data space can only be written to via the write(..., reserved=True).
 
104
 
        self.chunk_size = chunk_size
 
105
 
        self.compressor = zlib.compressobj()
 
108
 
        self.bytes_out_len = 0
 
109
 
        # bytes that have been seen, but not included in a flush to out yet
 
110
 
        self.unflushed_in_bytes = 0
 
113
 
        self.unused_bytes = None
 
114
 
        self.reserved_size = reserved
 
115
 
        # Default is to make building fast rather than compact
 
116
 
        self.set_optimize(for_size=optimize_for_size)
 
121
 
        This returns the final compressed chunk, and either None, or the
 
122
 
        bytes that did not fit in the chunk.
 
124
 
        :return: (compressed_bytes, unused_bytes, num_nulls_needed)
 
125
 
            compressed_bytes    a list of bytes that were output from the
 
126
 
                                compressor. If the compressed length was not
 
127
 
                                exactly chunk_size, the final string will be a
 
128
 
                                string of all null bytes to pad this to
 
130
 
            unused_bytes        None, or the last bytes that were added, which
 
132
 
            num_nulls_needed    How many nulls are padded at the end
 
134
 
        self.bytes_in = None # Free the data cached so far, we don't need it
 
135
 
        out = self.compressor.flush(Z_FINISH)
 
136
 
        self.bytes_list.append(out)
 
137
 
        self.bytes_out_len += len(out)
 
139
 
        if self.bytes_out_len > self.chunk_size:
 
140
 
            raise AssertionError('Somehow we ended up with too much'
 
141
 
                                 ' compressed data, %d > %d'
 
142
 
                                 % (self.bytes_out_len, self.chunk_size))
 
143
 
        nulls_needed = self.chunk_size - self.bytes_out_len
 
145
 
            self.bytes_list.append("\x00" * nulls_needed)
 
146
 
        return self.bytes_list, self.unused_bytes, nulls_needed
 
148
 
    def set_optimize(self, for_size=True):
 
149
 
        """Change how we optimize our writes.
 
151
 
        :param for_size: If True, optimize for minimum space usage, otherwise
 
152
 
            optimize for fastest writing speed.
 
156
 
            opts = ChunkWriter._repack_opts_for_size
 
158
 
            opts = ChunkWriter._repack_opts_for_speed
 
159
 
        self._max_repack, self._max_zsync = opts
 
161
 
    def _recompress_all_bytes_in(self, extra_bytes=None):
 
162
 
        """Recompress the current bytes_in, and optionally more.
 
164
 
        :param extra_bytes: Optional, if supplied we will add it with
 
166
 
        :return: (bytes_out, bytes_out_len, alt_compressed)
 
167
 
            bytes_out   is the compressed bytes returned from the compressor
 
168
 
            bytes_out_len the length of the compressed output
 
169
 
            compressor  An object with everything packed in so far, and
 
172
 
        compressor = zlib.compressobj()
 
174
 
        append = bytes_out.append
 
175
 
        compress = compressor.compress
 
176
 
        for accepted_bytes in self.bytes_in:
 
177
 
            out = compress(accepted_bytes)
 
181
 
            out = compress(extra_bytes)
 
182
 
            out += compressor.flush(Z_SYNC_FLUSH)
 
184
 
        bytes_out_len = sum(map(len, bytes_out))
 
185
 
        return bytes_out, bytes_out_len, compressor
 
187
 
    def write(self, bytes, reserved=False):
 
188
 
        """Write some bytes to the chunk.
 
190
 
        If the bytes fit, False is returned. Otherwise True is returned
 
191
 
        and the bytes have not been added to the chunk.
 
193
 
        :param bytes: The bytes to include
 
194
 
        :param reserved: If True, we can use the space reserved in the
 
197
 
        if self.num_repack > self._max_repack and not reserved:
 
198
 
            self.unused_bytes = bytes
 
201
 
            capacity = self.chunk_size
 
203
 
            capacity = self.chunk_size - self.reserved_size
 
204
 
        comp = self.compressor
 
206
 
        # Check to see if the currently unflushed bytes would fit with a bit of
 
207
 
        # room to spare, assuming no compression.
 
208
 
        next_unflushed = self.unflushed_in_bytes + len(bytes)
 
209
 
        remaining_capacity = capacity - self.bytes_out_len - 10
 
210
 
        if (next_unflushed < remaining_capacity):
 
211
 
            # looks like it will fit
 
212
 
            out = comp.compress(bytes)
 
214
 
                self.bytes_list.append(out)
 
215
 
                self.bytes_out_len += len(out)
 
216
 
            self.bytes_in.append(bytes)
 
217
 
            self.unflushed_in_bytes += len(bytes)
 
219
 
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
 
220
 
            # Note: It is tempting to do this as a look-ahead pass, and to
 
221
 
            #       'copy()' the compressor before flushing. However, it seems
 
222
 
            #       that Which means that it is the same thing as increasing
 
223
 
            #       repack, similar cost, same benefit. And this way we still
 
224
 
            #       have the 'repack' knob that can be adjusted, and not depend
 
225
 
            #       on a platform-specific 'copy()' function.
 
227
 
            if self._max_repack == 0 and self.num_zsync > self._max_zsync:
 
229
 
                self.unused_bytes = bytes
 
231
 
            out = comp.compress(bytes)
 
232
 
            out += comp.flush(Z_SYNC_FLUSH)
 
233
 
            self.unflushed_in_bytes = 0
 
235
 
                self.bytes_list.append(out)
 
236
 
                self.bytes_out_len += len(out)
 
238
 
            # We are a bit extra conservative, because it seems that you *can*
 
239
 
            # get better compression with Z_SYNC_FLUSH than a full compress. It
 
240
 
            # is probably very rare, but we were able to trigger it.
 
241
 
            if self.num_repack == 0:
 
245
 
            if self.bytes_out_len + safety_margin <= capacity:
 
246
 
                # It fit, so mark it added
 
247
 
                self.bytes_in.append(bytes)
 
249
 
                # We are over budget, try to squeeze this in without any
 
252
 
                (bytes_out, this_len,
 
253
 
                 compressor) = self._recompress_all_bytes_in(bytes)
 
254
 
                if self.num_repack >= self._max_repack:
 
255
 
                    # When we get *to* _max_repack, bump over so that the
 
256
 
                    # earlier > _max_repack will be triggered.
 
258
 
                if this_len + 10 > capacity:
 
259
 
                    (bytes_out, this_len,
 
260
 
                     compressor) = self._recompress_all_bytes_in()
 
261
 
                    self.compressor = compressor
 
262
 
                    # Force us to not allow more data
 
263
 
                    self.num_repack = self._max_repack + 1
 
264
 
                    self.bytes_list = bytes_out
 
265
 
                    self.bytes_out_len = this_len
 
266
 
                    self.unused_bytes = bytes
 
269
 
                    # This fits when we pack it tighter, so use the new packing
 
270
 
                    self.compressor = compressor
 
271
 
                    self.bytes_in.append(bytes)
 
272
 
                    self.bytes_list = bytes_out
 
273
 
                    self.bytes_out_len = this_len