36
36
will sometimes start over and compress the whole list to get tighter
37
37
packing. We get diminishing returns after a while, so this limits the
38
38
number of times we will try.
39
The default is to try to avoid recompressing entirely, but setting this
40
to something like 20 will give maximum compression.
42
:cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
43
can limit the number of times we will try to pack more data into a
44
node. This allows us to do a single compression pass, rather than
45
trying until we overflow, and then recompressing again.
39
In testing, some values for bzr.dev::
41
w/o copy w/ copy w/ copy ins w/ copy & save
42
repack time MB time MB time MB time MB
43
1 8.8 5.1 8.9 5.1 9.6 4.4 12.5 4.1
44
2 9.6 4.4 10.1 4.3 10.4 4.2 11.1 4.1
45
3 10.6 4.2 11.1 4.1 11.2 4.1 11.3 4.1
48
20 12.9 4.1 12.2 4.1 12.3 4.1
50
In testing, some values for mysql-unpacked::
52
w/o copy w/ copy w/ copy ins w/ copy & save
53
repack time MB time MB time MB time MB
55
2 59.3 14.1 62.6 13.5 64.3 13.4
59
:cvar _default_min_compression_size: The expected minimum compression.
60
While packing nodes into the page, we won't Z_SYNC_FLUSH until we have
61
received this much input data. This saves time, because we don't bloat
62
the result with SYNC entries (and then need to repack), but if it is
63
set too high we will accept data that will never fit and trigger a
47
# In testing, some values for bzr.dev::
48
# repack time MB max full
55
# zsync time MB repack stop_for_z
69
# In testing, some values for mysql-unpacked::
71
# repack time MB full stop_for_repack
77
# 0 29.5 116.5 0 29782
84
# 10 29.4 18.6 195 4526
85
# 11 29.2 18.0 421 4143
86
# 12 28.0 17.5 702 3738
87
# 15 28.9 16.5 1223 2969
88
# 20 29.6 15.7 2182 1810
89
# 30 31.4 15.4 3891 23
91
# Tuple of (num_repack_attempts, num_zsync_attempts)
92
# num_zsync_attempts only has meaning if num_repack_attempts is 0.
93
_repack_opts_for_speed = (0, 8)
94
_repack_opts_for_size = (20, 0)
96
def __init__(self, chunk_size, reserved=0, optimize_for_size=False):
68
_default_min_compression_size = 1.8
70
def __init__(self, chunk_size, reserved=0):
97
71
"""Create a ChunkWriter to write chunk_size chunks.
99
73
:param chunk_size: The total byte count to emit at the end of the
101
75
:param reserved: How many bytes to allow for reserved data. reserved
102
data space can only be written to via the write(..., reserved=True).
76
data space can only be written to via the write_reserved method.
104
78
self.chunk_size = chunk_size
105
79
self.compressor = zlib.compressobj()
106
80
self.bytes_in = []
107
81
self.bytes_list = []
108
82
self.bytes_out_len = 0
109
# bytes that have been seen, but not included in a flush to out yet
110
self.unflushed_in_bytes = 0
83
self.compressed = None
111
85
self.num_repack = 0
113
86
self.unused_bytes = None
114
87
self.reserved_size = reserved
115
# Default is to make building fast rather than compact
116
self.set_optimize(for_size=optimize_for_size)
88
self.min_compress_size = self._default_min_compression_size
119
91
"""Finish the chunk.
121
93
This returns the final compressed chunk, and either None, or the
122
94
bytes that did not fit in the chunk.
124
:return: (compressed_bytes, unused_bytes, num_nulls_needed)
125
compressed_bytes a list of bytes that were output from the
126
compressor. If the compressed length was not
127
exactly chunk_size, the final string will be a
128
string of all null bytes to pad this to
130
unused_bytes None, or the last bytes that were added, which
132
num_nulls_needed How many nulls are padded at the end
134
96
self.bytes_in = None # Free the data cached so far, we don't need it
135
97
out = self.compressor.flush(Z_FINISH)
136
98
self.bytes_list.append(out)
137
99
self.bytes_out_len += len(out)
139
100
if self.bytes_out_len > self.chunk_size:
140
101
raise AssertionError('Somehow we ended up with too much'
141
102
' compressed data, %d > %d'
142
103
% (self.bytes_out_len, self.chunk_size))
143
nulls_needed = self.chunk_size - self.bytes_out_len
104
nulls_needed = self.chunk_size - self.bytes_out_len % self.chunk_size
145
106
self.bytes_list.append("\x00" * nulls_needed)
146
107
return self.bytes_list, self.unused_bytes, nulls_needed
148
def set_optimize(self, for_size=True):
149
"""Change how we optimize our writes.
151
:param for_size: If True, optimize for minimum space usage, otherwise
152
optimize for fastest writing speed.
156
opts = ChunkWriter._repack_opts_for_size
158
opts = ChunkWriter._repack_opts_for_speed
159
self._max_repack, self._max_zsync = opts
161
109
def _recompress_all_bytes_in(self, extra_bytes=None):
162
110
"""Recompress the current bytes_in, and optionally more.
164
:param extra_bytes: Optional, if supplied we will add it with
112
:param extra_bytes: Optional, if supplied we will try to add it with
166
:return: (bytes_out, bytes_out_len, alt_compressed)
114
:return: (bytes_out, compressor, alt_compressed)
167
115
bytes_out is the compressed bytes returned from the compressor
168
bytes_out_len the length of the compressed output
169
116
compressor An object with everything packed in so far, and
170
117
Z_SYNC_FLUSH called.
118
alt_compressed If the compressor supports copy(), then this is a
119
snapshot just before extra_bytes is added.
120
It is (bytes_out, compressor) as well.
121
The idea is if you find you cannot fit the new
122
bytes, you don't have to start over.
123
And if you *can* you don't have to Z_SYNC_FLUSH
172
126
compressor = zlib.compressobj()
190
145
If the bytes fit, False is returned. Otherwise True is returned
191
146
and the bytes have not been added to the chunk.
193
:param bytes: The bytes to include
194
:param reserved: If True, we can use the space reserved in the
197
if self.num_repack > self._max_repack and not reserved:
198
self.unused_bytes = bytes
201
149
capacity = self.chunk_size
203
151
capacity = self.chunk_size - self.reserved_size
152
# Check quickly to see if this is likely to put us outside of our
154
next_seen_size = self.seen_bytes + len(bytes)
204
155
comp = self.compressor
206
# Check to see if the currently unflushed bytes would fit with a bit of
207
# room to spare, assuming no compression.
208
next_unflushed = self.unflushed_in_bytes + len(bytes)
209
remaining_capacity = capacity - self.bytes_out_len - 10
210
if (next_unflushed < remaining_capacity):
211
# looks like it will fit
156
if (next_seen_size < self.min_compress_size * capacity):
157
# No need, we assume this will "just fit"
212
158
out = comp.compress(bytes)
214
160
self.bytes_list.append(out)
215
161
self.bytes_out_len += len(out)
216
162
self.bytes_in.append(bytes)
217
self.unflushed_in_bytes += len(bytes)
163
self.seen_bytes = next_seen_size
165
if self.num_repack >= self._max_repack and not reserved:
166
# We already know we don't want to try to fit more
219
168
# This may or may not fit, try to add it with Z_SYNC_FLUSH
220
# Note: It is tempting to do this as a look-ahead pass, and to
221
# 'copy()' the compressor before flushing. However, it seems
222
# that Which means that it is the same thing as increasing
223
# repack, similar cost, same benefit. And this way we still
224
# have the 'repack' knob that can be adjusted, and not depend
225
# on a platform-specific 'copy()' function.
227
if self._max_repack == 0 and self.num_zsync > self._max_zsync:
229
self.unused_bytes = bytes
231
169
out = comp.compress(bytes)
232
170
out += comp.flush(Z_SYNC_FLUSH)
233
self.unflushed_in_bytes = 0
235
172
self.bytes_list.append(out)
236
173
self.bytes_out_len += len(out)
238
# We are a bit extra conservative, because it seems that you *can*
239
# get better compression with Z_SYNC_FLUSH than a full compress. It
240
# is probably very rare, but we were able to trigger it.
241
if self.num_repack == 0:
245
if self.bytes_out_len + safety_margin <= capacity:
246
# It fit, so mark it added
247
self.bytes_in.append(bytes)
174
if self.bytes_out_len + 10 > capacity:
249
175
# We are over budget, try to squeeze this in without any
250
176
# Z_SYNC_FLUSH calls
251
177
self.num_repack += 1
252
(bytes_out, this_len,
253
compressor) = self._recompress_all_bytes_in(bytes)
254
if self.num_repack >= self._max_repack:
255
# When we get *to* _max_repack, bump over so that the
256
# earlier > _max_repack will be triggered.
178
bytes_out, this_len, compressor = self._recompress_all_bytes_in(bytes)
258
179
if this_len + 10 > capacity:
259
(bytes_out, this_len,
260
compressor) = self._recompress_all_bytes_in()
180
# No way we can add anymore, we need to re-pack because our
181
# compressor is now out of sync.
182
# This seems to be rarely triggered over
183
# num_repack > _max_repack
184
bytes_out, this_len, compressor = self._recompress_all_bytes_in()
261
185
self.compressor = compressor
262
# Force us to not allow more data
263
self.num_repack = self._max_repack + 1
264
186
self.bytes_list = bytes_out
265
187
self.bytes_out_len = this_len
266
188
self.unused_bytes = bytes
269
191
# This fits when we pack it tighter, so use the new packing
192
# There is one Z_SYNC_FLUSH call in
193
# _recompress_all_bytes_in
270
194
self.compressor = compressor
271
195
self.bytes_in.append(bytes)
272
196
self.bytes_list = bytes_out
273
197
self.bytes_out_len = this_len
199
# It fit, so mark it added
200
self.bytes_in.append(bytes)
201
self.seen_bytes = next_seen_size