/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-26 00:56:10 UTC
  • mto: This revision was merged to the branch mainline in revision 3653.
  • Revision ID: john@arbash-meinel.com-20080826005610-275jq9uqje3prqry
Fix up the test suite now that things don't pack as well.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
#
 
17
 
 
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
 
19
 
 
20
import zlib
 
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
 
22
 
 
23
# [max_repack, buffer_full, repacks_with_space, min_compression,
 
24
#  total_bytes_in, total_bytes_out, avg_comp,
 
25
#  bytes_autopack, bytes_sync_packed, num_full_by_zsync]
 
26
_stats = [0, 0, 0, 999, 0, 0, 0, 0, 0, 0]
 
27
 
 
28
class ChunkWriter(object):
 
29
    """ChunkWriter allows writing of compressed data with a fixed size.
 
30
 
 
31
    If less data is supplied than fills a chunk, the chunk is padded with
 
32
    NULL bytes. If more data is supplied, then the writer packs as much
 
33
    in as it can, but never splits any item it was given.
 
34
 
 
35
    The algorithm for packing is open to improvement! Current it is:
 
36
     - write the bytes given
 
37
     - if the total seen bytes so far exceeds the chunk size, flush.
 
38
 
 
39
    :cvar _max_repack: To fit the maximum number of entries into a node, we
 
40
        will sometimes start over and compress the whole list to get tighter
 
41
        packing. We get diminishing returns after a while, so this limits the
 
42
        number of times we will try.
 
43
        In testing, some values for bzr.dev::
 
44
 
 
45
            repack  time  MB   max   full
 
46
             1       7.5  4.6  1140  0
 
47
             2       8.4  4.2  1036  1          6.8
 
48
             3       9.8  4.1  1012  278
 
49
             4      10.8  4.1  728   945
 
50
            20      11.1  4.1  0     1012
 
51
 
 
52
            repack = 0
 
53
            zsync   time  MB    repack  max_z   time w/ add_node
 
54
             0       6.7  24.7  0       6270    5.0
 
55
             1       6.5  13.2  0       3342    4.3
 
56
             2       6.6   9.6  0       2414    4.9
 
57
             5       6.5   6.2  0       1549    4.8
 
58
             6       6.5   5.8  1       1435    4.8
 
59
             7       6.6   5.5  19      1337    4.8
 
60
             8       6.7   5.3  81      1220    4.4
 
61
            10       6.8   5.0  260     967     5.3
 
62
            11       6.8   4.9  366     839     5.3
 
63
            12       6.9   4.8  454     731     5.1
 
64
            15       7.2   4.7  704     450     5.8
 
65
            20       7.7   4.6  1133    7       5.8
 
66
 
 
67
        In testing, some values for mysql-unpacked::
 
68
 
 
69
                    next_bytes estim
 
70
            repack  time  MB    hit_max full
 
71
             1      51.7  15.4  3913  0
 
72
             2      54.4  13.7  3467  0         35.4
 
73
            20      67.0  13.4  0     3380      46.7
 
74
 
 
75
            repack=0
 
76
            zsync                               time w/ add_node
 
77
             0      47.7 116.5  0       29782   29.5
 
78
             1      48.5  60.2  0       15356   27.8
 
79
             2      48.1  42.4  0       10822   27.8
 
80
             5      48.3  25.5  0       6491    26.8
 
81
             6      48.0  23.2  13      5896    27.3
 
82
             7      48.1  21.6  29      5451    27.5
 
83
             8      48.1  20.3  52      5108    27.1
 
84
            10      46.9  18.6  195     4526    29.4
 
85
            11      48.8  18.0  421     4143    29.2
 
86
            12      47.4  17.5  702     3738    28.0
 
87
            15      49.6  16.5  1223    2969    28.9
 
88
            20      48.9  15.7  2182    1810    29.6
 
89
            30            15.4  3891    23      31.4
 
90
    """
 
91
 
 
92
    _max_repack = 0
 
93
    _max_zsync = 8
 
94
 
 
95
    def __init__(self, chunk_size, reserved=0):
 
96
        """Create a ChunkWriter to write chunk_size chunks.
 
97
 
 
98
        :param chunk_size: The total byte count to emit at the end of the
 
99
            chunk.
 
100
        :param reserved: How many bytes to allow for reserved data. reserved
 
101
            data space can only be written to via the write_reserved method.
 
102
        """
 
103
        self.chunk_size = chunk_size
 
104
        self.compressor = zlib.compressobj()
 
105
        self.bytes_in = []
 
106
        self.bytes_list = []
 
107
        self.bytes_out_len = 0
 
108
        self.compressed = None
 
109
        self.seen_bytes = 0
 
110
        # bytes that have been seen, but not included in a flush to out yet
 
111
        self.unflushed_in_bytes = 0
 
112
        self.num_repack = 0
 
113
        self.num_zsync = 0
 
114
        self.done = False # We will accept no more bytes
 
115
        self.unused_bytes = None
 
116
        self.reserved_size = reserved
 
117
 
 
118
    def finish(self):
 
119
        """Finish the chunk.
 
120
 
 
121
        This returns the final compressed chunk, and either None, or the
 
122
        bytes that did not fit in the chunk.
 
123
        """
 
124
        self.bytes_in = None # Free the data cached so far, we don't need it
 
125
        out = self.compressor.flush(Z_FINISH)
 
126
        self.bytes_list.append(out)
 
127
        self.bytes_out_len += len(out)
 
128
        if self.num_repack > 0 and self.bytes_out_len > 0:
 
129
            comp = float(self.seen_bytes) / self.bytes_out_len
 
130
            if comp < _stats[3]:
 
131
                _stats[3] = comp
 
132
        _stats[4] += self.seen_bytes
 
133
        _stats[5] += self.bytes_out_len
 
134
        _stats[6] = float(_stats[4]) / _stats[5]
 
135
 
 
136
        if self._max_repack == 0 and self.num_repack == 1:
 
137
            _stats[9] += 1
 
138
 
 
139
        if self.bytes_out_len > self.chunk_size:
 
140
            raise AssertionError('Somehow we ended up with too much'
 
141
                                 ' compressed data, %d > %d'
 
142
                                 % (self.bytes_out_len, self.chunk_size))
 
143
        nulls_needed = self.chunk_size - self.bytes_out_len
 
144
        if nulls_needed:
 
145
            self.bytes_list.append("\x00" * nulls_needed)
 
146
        return self.bytes_list, self.unused_bytes, nulls_needed
 
147
 
 
148
    def _recompress_all_bytes_in(self, extra_bytes=None):
 
149
        """Recompress the current bytes_in, and optionally more.
 
150
 
 
151
        :param extra_bytes: Optional, if supplied we will try to add it with
 
152
            Z_SYNC_FLUSH
 
153
        :return: (bytes_out, compressor, alt_compressed)
 
154
            bytes_out   is the compressed bytes returned from the compressor
 
155
            compressor  An object with everything packed in so far, and
 
156
                        Z_SYNC_FLUSH called.
 
157
            alt_compressed  If the compressor supports copy(), then this is a
 
158
                            snapshot just before extra_bytes is added.
 
159
                            It is (bytes_out, compressor) as well.
 
160
                            The idea is if you find you cannot fit the new
 
161
                            bytes, you don't have to start over.
 
162
                            And if you *can* you don't have to Z_SYNC_FLUSH
 
163
                            yet.
 
164
        """
 
165
        compressor = zlib.compressobj()
 
166
        bytes_out = []
 
167
        append = bytes_out.append
 
168
        compress = compressor.compress
 
169
        for accepted_bytes in self.bytes_in:
 
170
            out = compress(accepted_bytes)
 
171
            if out:
 
172
                append(out)
 
173
        if extra_bytes:
 
174
            out = compress(extra_bytes)
 
175
            out += compressor.flush(Z_SYNC_FLUSH)
 
176
            append(out)
 
177
        bytes_out_len = sum(map(len, bytes_out))
 
178
        return bytes_out, bytes_out_len, compressor
 
179
 
 
180
    def write(self, bytes, reserved=False):
 
181
        """Write some bytes to the chunk.
 
182
 
 
183
        If the bytes fit, False is returned. Otherwise True is returned
 
184
        and the bytes have not been added to the chunk.
 
185
        """
 
186
        if self.num_repack > self._max_repack and not reserved:
 
187
            self.unused_bytes = bytes
 
188
            return True
 
189
        if reserved:
 
190
            capacity = self.chunk_size
 
191
        else:
 
192
            capacity = self.chunk_size - self.reserved_size
 
193
        comp = self.compressor
 
194
        # Check to see if the currently unflushed bytes would fit with a bit of
 
195
        # room to spare, assuming no compression.
 
196
        next_unflushed = self.unflushed_in_bytes + len(bytes)
 
197
        remaining_capacity = capacity - self.bytes_out_len - 10
 
198
        if (next_unflushed < remaining_capacity):
 
199
            # Yes, just push it in, assuming it will fit
 
200
            out = comp.compress(bytes)
 
201
            if out:
 
202
                self.bytes_list.append(out)
 
203
                self.bytes_out_len += len(out)
 
204
            self.bytes_in.append(bytes)
 
205
            self.seen_bytes += len(bytes)
 
206
            self.unflushed_in_bytes += len(bytes)
 
207
            _stats[7] += 1 # len(bytes)
 
208
        else:
 
209
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
 
210
            _stats[8] += 1 # len(bytes)
 
211
            # Note: It is tempting to do this as a look-ahead pass, and to
 
212
            # 'copy()' the compressor before flushing. However, it seems that
 
213
            # 'flush()' is when the compressor actually does most work
 
214
            # (consider it the real compression pass over the data-so-far).
 
215
            # Which means that it is the same thing as increasing repack,
 
216
            # similar cost, same benefit. And this way we still have the
 
217
            # 'repack' knob that can be adjusted, and not depend on a
 
218
            # platform-specific 'copy()' function.
 
219
            self.num_zsync += 1
 
220
            if self._max_repack == 0 and self.num_zsync > self._max_zsync:
 
221
                self.num_repack += 1
 
222
                return True
 
223
            out = comp.compress(bytes)
 
224
            out += comp.flush(Z_SYNC_FLUSH)
 
225
            self.unflushed_in_bytes = 0
 
226
            if out:
 
227
                self.bytes_list.append(out)
 
228
                self.bytes_out_len += len(out)
 
229
 
 
230
            # We are a bit extra conservative, because it seems that you *can*
 
231
            # get better compression with Z_SYNC_FLUSH than a full compress. It
 
232
            # is probably very rare, but we were able to trigger it.
 
233
            if self.num_repack == 0:
 
234
                safety_margin = 100
 
235
            else:
 
236
                safety_margin = 10
 
237
            if self.bytes_out_len + safety_margin <= capacity:
 
238
                # It fit, so mark it added
 
239
                self.bytes_in.append(bytes)
 
240
                self.seen_bytes += len(bytes)
 
241
            else:
 
242
                # We are over budget, try to squeeze this in without any
 
243
                # Z_SYNC_FLUSH calls
 
244
                self.num_repack += 1
 
245
                (bytes_out, this_len,
 
246
                 compressor) = self._recompress_all_bytes_in(bytes)
 
247
                if self.num_repack >= self._max_repack:
 
248
                    # When we get *to* _max_repack, bump over so that the
 
249
                    # earlier > _max_repack will be triggered.
 
250
                    self.num_repack += 1
 
251
                    _stats[0] += 1
 
252
                if this_len + 10 > capacity:
 
253
                    (bytes_out, this_len,
 
254
                     compressor) = self._recompress_all_bytes_in()
 
255
                    _stats[1] += 1
 
256
                    self.compressor = compressor
 
257
                    # Force us to not allow more data
 
258
                    self.num_repack = self._max_repack + 1
 
259
                    self.bytes_list = bytes_out
 
260
                    self.bytes_out_len = this_len
 
261
                    self.unused_bytes = bytes
 
262
                    return True
 
263
                else:
 
264
                    # This fits when we pack it tighter, so use the new packing
 
265
                    # There is one Z_SYNC_FLUSH call in
 
266
                    # _recompress_all_bytes_in
 
267
                    _stats[2] += 1
 
268
                    self.compressor = compressor
 
269
                    self.bytes_in.append(bytes)
 
270
                    self.bytes_list = bytes_out
 
271
                    self.bytes_out_len = this_len
 
272
        return False
 
273