/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-28 01:59:58 UTC
  • mto: This revision was merged to the branch mainline in revision 3653.
  • Revision ID: john@arbash-meinel.com-20080828015958-bvdt8spf2ls57s39
Clean out the global state, good for prototyping and tuning, bad for production code.
(as recommended by Robert)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
#
 
17
 
 
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
 
19
 
 
20
import zlib
 
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
 
22
 
 
23
 
 
24
class ChunkWriter(object):
 
25
    """ChunkWriter allows writing of compressed data with a fixed size.
 
26
 
 
27
    If less data is supplied than fills a chunk, the chunk is padded with
 
28
    NULL bytes. If more data is supplied, then the writer packs as much
 
29
    in as it can, but never splits any item it was given.
 
30
 
 
31
    The algorithm for packing is open to improvement! Current it is:
 
32
     - write the bytes given
 
33
     - if the total seen bytes so far exceeds the chunk size, flush.
 
34
 
 
35
    :cvar _max_repack: To fit the maximum number of entries into a node, we
 
36
        will sometimes start over and compress the whole list to get tighter
 
37
        packing. We get diminishing returns after a while, so this limits the
 
38
        number of times we will try.
 
39
        The default is to try to avoid recompressing entirely, but setting this
 
40
        to something like 20 will give maximum compression.
 
41
 
 
42
    :cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
 
43
        can limit the number of times we will try to pack more data into a
 
44
        node. This allows us to do a single compression pass, rather than
 
45
        trying until we overflow, and then recompressing again.
 
46
    """
 
47
    #    In testing, some values for bzr.dev::
 
48
    #        repack  time  MB   max   full
 
49
    #         1       7.5  4.6  1140  0
 
50
    #         2       8.4  4.2  1036  1          6.8
 
51
    #         3       9.8  4.1  1012  278
 
52
    #         4      10.8  4.1  728   945
 
53
    #        20      11.1  4.1  0     1012
 
54
    #        repack = 0
 
55
    #        zsync   time  MB    repack  max_z   time w/ add_node
 
56
    #         0       6.7  24.7  0       6270    5.0
 
57
    #         1       6.5  13.2  0       3342    4.3
 
58
    #         2       6.6   9.6  0       2414    4.9
 
59
    #         5       6.5   6.2  0       1549    4.8
 
60
    #         6       6.5   5.8  1       1435    4.8
 
61
    #         7       6.6   5.5  19      1337    4.8
 
62
    #         8       6.7   5.3  81      1220    4.4
 
63
    #        10       6.8   5.0  260     967     5.3
 
64
    #        11       6.8   4.9  366     839     5.3
 
65
    #        12       6.9   4.8  454     731     5.1
 
66
    #        15       7.2   4.7  704     450     5.8
 
67
    #        20       7.7   4.6  1133    7       5.8
 
68
 
 
69
    #    In testing, some values for mysql-unpacked::
 
70
    #                next_bytes estim
 
71
    #        repack  time  MB    hit_max full
 
72
    #         1      51.7  15.4  3913  0
 
73
    #         2      54.4  13.7  3467  0         35.4
 
74
    #        20      67.0  13.4  0     3380      46.7
 
75
    #        repack=0
 
76
    #        zsync                               time w/ add_node
 
77
    #         0      47.7 116.5  0       29782   29.5
 
78
    #         1      48.5  60.2  0       15356   27.8
 
79
    #         2      48.1  42.4  0       10822   27.8
 
80
    #         5      48.3  25.5  0       6491    26.8
 
81
    #         6      48.0  23.2  13      5896    27.3
 
82
    #         7      48.1  21.6  29      5451    27.5
 
83
    #         8      48.1  20.3  52      5108    27.1
 
84
    #        10      46.9  18.6  195     4526    29.4
 
85
    #        11      48.8  18.0  421     4143    29.2
 
86
    #        12      47.4  17.5  702     3738    28.0
 
87
    #        15      49.6  16.5  1223    2969    28.9
 
88
    #        20      48.9  15.7  2182    1810    29.6
 
89
    #        30            15.4  3891    23      31.4
 
90
 
 
91
    _max_repack = 0
 
92
    _max_zsync = 8
 
93
 
 
94
    def __init__(self, chunk_size, reserved=0):
 
95
        """Create a ChunkWriter to write chunk_size chunks.
 
96
 
 
97
        :param chunk_size: The total byte count to emit at the end of the
 
98
            chunk.
 
99
        :param reserved: How many bytes to allow for reserved data. reserved
 
100
            data space can only be written to via the write_reserved method.
 
101
        """
 
102
        self.chunk_size = chunk_size
 
103
        self.compressor = zlib.compressobj()
 
104
        self.bytes_in = []
 
105
        self.bytes_list = []
 
106
        self.bytes_out_len = 0
 
107
        self.compressed = None
 
108
        self.seen_bytes = 0
 
109
        # bytes that have been seen, but not included in a flush to out yet
 
110
        self.unflushed_in_bytes = 0
 
111
        self.num_repack = 0
 
112
        self.num_zsync = 0
 
113
        self.done = False # We will accept no more bytes
 
114
        self.unused_bytes = None
 
115
        self.reserved_size = reserved
 
116
 
 
117
    def finish(self):
 
118
        """Finish the chunk.
 
119
 
 
120
        This returns the final compressed chunk, and either None, or the
 
121
        bytes that did not fit in the chunk.
 
122
        """
 
123
        self.bytes_in = None # Free the data cached so far, we don't need it
 
124
        out = self.compressor.flush(Z_FINISH)
 
125
        self.bytes_list.append(out)
 
126
        self.bytes_out_len += len(out)
 
127
 
 
128
        if self.bytes_out_len > self.chunk_size:
 
129
            raise AssertionError('Somehow we ended up with too much'
 
130
                                 ' compressed data, %d > %d'
 
131
                                 % (self.bytes_out_len, self.chunk_size))
 
132
        nulls_needed = self.chunk_size - self.bytes_out_len
 
133
        if nulls_needed:
 
134
            self.bytes_list.append("\x00" * nulls_needed)
 
135
        return self.bytes_list, self.unused_bytes, nulls_needed
 
136
 
 
137
    def _recompress_all_bytes_in(self, extra_bytes=None):
 
138
        """Recompress the current bytes_in, and optionally more.
 
139
 
 
140
        :param extra_bytes: Optional, if supplied we will try to add it with
 
141
            Z_SYNC_FLUSH
 
142
        :return: (bytes_out, compressor, alt_compressed)
 
143
            bytes_out   is the compressed bytes returned from the compressor
 
144
            compressor  An object with everything packed in so far, and
 
145
                        Z_SYNC_FLUSH called.
 
146
            alt_compressed  If the compressor supports copy(), then this is a
 
147
                            snapshot just before extra_bytes is added.
 
148
                            It is (bytes_out, compressor) as well.
 
149
                            The idea is if you find you cannot fit the new
 
150
                            bytes, you don't have to start over.
 
151
                            And if you *can* you don't have to Z_SYNC_FLUSH
 
152
                            yet.
 
153
        """
 
154
        compressor = zlib.compressobj()
 
155
        bytes_out = []
 
156
        append = bytes_out.append
 
157
        compress = compressor.compress
 
158
        for accepted_bytes in self.bytes_in:
 
159
            out = compress(accepted_bytes)
 
160
            if out:
 
161
                append(out)
 
162
        if extra_bytes:
 
163
            out = compress(extra_bytes)
 
164
            out += compressor.flush(Z_SYNC_FLUSH)
 
165
            append(out)
 
166
        bytes_out_len = sum(map(len, bytes_out))
 
167
        return bytes_out, bytes_out_len, compressor
 
168
 
 
169
    def write(self, bytes, reserved=False):
 
170
        """Write some bytes to the chunk.
 
171
 
 
172
        If the bytes fit, False is returned. Otherwise True is returned
 
173
        and the bytes have not been added to the chunk.
 
174
        """
 
175
        if self.num_repack > self._max_repack and not reserved:
 
176
            self.unused_bytes = bytes
 
177
            return True
 
178
        if reserved:
 
179
            capacity = self.chunk_size
 
180
        else:
 
181
            capacity = self.chunk_size - self.reserved_size
 
182
        comp = self.compressor
 
183
        # Check to see if the currently unflushed bytes would fit with a bit of
 
184
        # room to spare, assuming no compression.
 
185
        next_unflushed = self.unflushed_in_bytes + len(bytes)
 
186
        remaining_capacity = capacity - self.bytes_out_len - 10
 
187
        if (next_unflushed < remaining_capacity):
 
188
            # Yes, just push it in, assuming it will fit
 
189
            out = comp.compress(bytes)
 
190
            if out:
 
191
                self.bytes_list.append(out)
 
192
                self.bytes_out_len += len(out)
 
193
            self.bytes_in.append(bytes)
 
194
            self.seen_bytes += len(bytes)
 
195
            self.unflushed_in_bytes += len(bytes)
 
196
        else:
 
197
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
 
198
            # Note: It is tempting to do this as a look-ahead pass, and to
 
199
            # 'copy()' the compressor before flushing. However, it seems that
 
200
            # 'flush()' is when the compressor actually does most work
 
201
            # (consider it the real compression pass over the data-so-far).
 
202
            # Which means that it is the same thing as increasing repack,
 
203
            # similar cost, same benefit. And this way we still have the
 
204
            # 'repack' knob that can be adjusted, and not depend on a
 
205
            # platform-specific 'copy()' function.
 
206
            self.num_zsync += 1
 
207
            if self._max_repack == 0 and self.num_zsync > self._max_zsync:
 
208
                self.num_repack += 1
 
209
                return True
 
210
            out = comp.compress(bytes)
 
211
            out += comp.flush(Z_SYNC_FLUSH)
 
212
            self.unflushed_in_bytes = 0
 
213
            if out:
 
214
                self.bytes_list.append(out)
 
215
                self.bytes_out_len += len(out)
 
216
 
 
217
            # We are a bit extra conservative, because it seems that you *can*
 
218
            # get better compression with Z_SYNC_FLUSH than a full compress. It
 
219
            # is probably very rare, but we were able to trigger it.
 
220
            if self.num_repack == 0:
 
221
                safety_margin = 100
 
222
            else:
 
223
                safety_margin = 10
 
224
            if self.bytes_out_len + safety_margin <= capacity:
 
225
                # It fit, so mark it added
 
226
                self.bytes_in.append(bytes)
 
227
                self.seen_bytes += len(bytes)
 
228
            else:
 
229
                # We are over budget, try to squeeze this in without any
 
230
                # Z_SYNC_FLUSH calls
 
231
                self.num_repack += 1
 
232
                (bytes_out, this_len,
 
233
                 compressor) = self._recompress_all_bytes_in(bytes)
 
234
                if self.num_repack >= self._max_repack:
 
235
                    # When we get *to* _max_repack, bump over so that the
 
236
                    # earlier > _max_repack will be triggered.
 
237
                    self.num_repack += 1
 
238
                if this_len + 10 > capacity:
 
239
                    (bytes_out, this_len,
 
240
                     compressor) = self._recompress_all_bytes_in()
 
241
                    self.compressor = compressor
 
242
                    # Force us to not allow more data
 
243
                    self.num_repack = self._max_repack + 1
 
244
                    self.bytes_list = bytes_out
 
245
                    self.bytes_out_len = this_len
 
246
                    self.unused_bytes = bytes
 
247
                    return True
 
248
                else:
 
249
                    # This fits when we pack it tighter, so use the new packing
 
250
                    # There is one Z_SYNC_FLUSH call in
 
251
                    # _recompress_all_bytes_in
 
252
                    self.compressor = compressor
 
253
                    self.bytes_in.append(bytes)
 
254
                    self.bytes_list = bytes_out
 
255
                    self.bytes_out_len = this_len
 
256
        return False
 
257