/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-24 00:39:50 UTC
  • mto: This revision was merged to the branch mainline in revision 7504.
  • Revision ID: jelmer@jelmer.uk-20200524003950-bbc545r76vc5yajg
Add github action.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for group compression."""
 
18
 
 
19
import zlib
 
20
 
 
21
from .. import (
 
22
    config,
 
23
    errors,
 
24
    osutils,
 
25
    tests,
 
26
    trace,
 
27
    )
 
28
from ..bzr import (
 
29
    btree_index,
 
30
    groupcompress,
 
31
    knit,
 
32
    index as _mod_index,
 
33
    versionedfile,
 
34
    )
 
35
from ..osutils import sha_string
 
36
from .test__groupcompress import compiled_groupcompress_feature
 
37
from .scenarios import load_tests_apply_scenarios
 
38
 
 
39
 
 
40
def group_compress_implementation_scenarios():
 
41
    scenarios = [
 
42
        ('python', {'compressor': groupcompress.PythonGroupCompressor}),
 
43
        ]
 
44
    if compiled_groupcompress_feature.available():
 
45
        scenarios.append(('C',
 
46
                          {'compressor': groupcompress.PyrexGroupCompressor}))
 
47
    return scenarios
 
48
 
 
49
 
 
50
load_tests = load_tests_apply_scenarios
 
51
 
 
52
 
 
53
class TestGroupCompressor(tests.TestCase):
 
54
 
 
55
    def _chunks_to_repr_lines(self, chunks):
 
56
        return '\n'.join(map(repr, b''.join(chunks).split(b'\n')))
 
57
 
 
58
    def assertEqualDiffEncoded(self, expected, actual):
 
59
        """Compare the actual content to the expected content.
 
60
 
 
61
        :param expected: A group of chunks that we expect to see
 
62
        :param actual: The measured 'chunks'
 
63
 
 
64
        We will transform the chunks back into lines, and then run 'repr()'
 
65
        over them to handle non-ascii characters.
 
66
        """
 
67
        self.assertEqualDiff(self._chunks_to_repr_lines(expected),
 
68
                             self._chunks_to_repr_lines(actual))
 
69
 
 
70
 
 
71
class TestAllGroupCompressors(TestGroupCompressor):
 
72
    """Tests for GroupCompressor"""
 
73
 
 
74
    scenarios = group_compress_implementation_scenarios()
 
75
    compressor = None  # Set by scenario
 
76
 
 
77
    def test_empty_delta(self):
 
78
        compressor = self.compressor()
 
79
        self.assertEqual([], compressor.chunks)
 
80
 
 
81
    def test_one_nosha_delta(self):
 
82
        # diff against NUKK
 
83
        compressor = self.compressor()
 
84
        text = b'strange\ncommon\n'
 
85
        sha1, start_point, end_point, _ = compressor.compress(
 
86
            ('label',), [text], len(text), None)
 
87
        self.assertEqual(sha_string(b'strange\ncommon\n'), sha1)
 
88
        expected_lines = b'f\x0fstrange\ncommon\n'
 
89
        self.assertEqual(expected_lines, b''.join(compressor.chunks))
 
90
        self.assertEqual(0, start_point)
 
91
        self.assertEqual(len(expected_lines), end_point)
 
92
 
 
93
    def test_empty_content(self):
 
94
        compressor = self.compressor()
 
95
        # Adding empty bytes should return the 'null' record
 
96
        sha1, start_point, end_point, kind = compressor.compress(
 
97
            ('empty',), [], 0, None)
 
98
        self.assertEqual(0, start_point)
 
99
        self.assertEqual(0, end_point)
 
100
        self.assertEqual('fulltext', kind)
 
101
        self.assertEqual(groupcompress._null_sha1, sha1)
 
102
        self.assertEqual(0, compressor.endpoint)
 
103
        self.assertEqual([], compressor.chunks)
 
104
        # Even after adding some content
 
105
        text = b'some\nbytes\n'
 
106
        compressor.compress(('content',), [text], len(text), None)
 
107
        self.assertTrue(compressor.endpoint > 0)
 
108
        sha1, start_point, end_point, kind = compressor.compress(
 
109
            ('empty2',), [], 0, None)
 
110
        self.assertEqual(0, start_point)
 
111
        self.assertEqual(0, end_point)
 
112
        self.assertEqual('fulltext', kind)
 
113
        self.assertEqual(groupcompress._null_sha1, sha1)
 
114
 
 
115
    def test_extract_from_compressor(self):
 
116
        # Knit fetching will try to reconstruct texts locally which results in
 
117
        # reading something that is in the compressor stream already.
 
118
        compressor = self.compressor()
 
119
        text = b'strange\ncommon long line\nthat needs a 16 byte match\n'
 
120
        sha1_1, _, _, _ = compressor.compress(
 
121
            ('label',), [text], len(text), None)
 
122
        expected_lines = list(compressor.chunks)
 
123
        text = b'common long line\nthat needs a 16 byte match\ndifferent\n'
 
124
        sha1_2, _, end_point, _ = compressor.compress(
 
125
            ('newlabel',), [text], len(text), None)
 
126
        # get the first out
 
127
        self.assertEqual(([b'strange\ncommon long line\n'
 
128
                           b'that needs a 16 byte match\n'], sha1_1),
 
129
                         compressor.extract(('label',)))
 
130
        # and the second
 
131
        self.assertEqual(([b'common long line\nthat needs a 16 byte match\n'
 
132
                           b'different\n'], sha1_2),
 
133
                         compressor.extract(('newlabel',)))
 
134
 
 
135
    def test_pop_last(self):
 
136
        compressor = self.compressor()
 
137
        text = b'some text\nfor the first entry\n'
 
138
        _, _, _, _ = compressor.compress(
 
139
            ('key1',), [text], len(text), None)
 
140
        expected_lines = list(compressor.chunks)
 
141
        text = b'some text\nfor the second entry\n'
 
142
        _, _, _, _ = compressor.compress(
 
143
            ('key2',), [text], len(text), None)
 
144
        compressor.pop_last()
 
145
        self.assertEqual(expected_lines, compressor.chunks)
 
146
 
 
147
 
 
148
class TestPyrexGroupCompressor(TestGroupCompressor):
 
149
 
 
150
    _test_needs_features = [compiled_groupcompress_feature]
 
151
    compressor = groupcompress.PyrexGroupCompressor
 
152
 
 
153
    def test_stats(self):
 
154
        compressor = self.compressor()
 
155
        chunks = [b'strange\n',
 
156
                  b'common very very long line\n',
 
157
                  b'plus more text\n']
 
158
        compressor.compress(
 
159
            ('label',), chunks, sum(map(len, chunks)), None)
 
160
        chunks = [
 
161
            b'common very very long line\n',
 
162
            b'plus more text\n',
 
163
            b'different\n',
 
164
            b'moredifferent\n']
 
165
        compressor.compress(
 
166
            ('newlabel',),
 
167
            chunks, sum(map(len, chunks)), None)
 
168
        chunks = [
 
169
            b'new\n',
 
170
            b'common very very long line\n',
 
171
            b'plus more text\n',
 
172
            b'different\n',
 
173
            b'moredifferent\n']
 
174
        compressor.compress(
 
175
            ('label3',), chunks, sum(map(len, chunks)), None)
 
176
        self.assertAlmostEqual(1.9, compressor.ratio(), 1)
 
177
 
 
178
    def test_two_nosha_delta(self):
 
179
        compressor = self.compressor()
 
180
        text = b'strange\ncommon long line\nthat needs a 16 byte match\n'
 
181
        sha1_1, _, _, _ = compressor.compress(('label',), [text], len(text), None)
 
182
        expected_lines = list(compressor.chunks)
 
183
        text = b'common long line\nthat needs a 16 byte match\ndifferent\n'
 
184
        sha1_2, start_point, end_point, _ = compressor.compress(
 
185
            ('newlabel',), [text], len(text), None)
 
186
        self.assertEqual(sha_string(text), sha1_2)
 
187
        expected_lines.extend([
 
188
            # 'delta', delta length
 
189
            b'd\x0f',
 
190
            # source and target length
 
191
            b'\x36',
 
192
            # copy the line common
 
193
            b'\x91\x0a\x2c',  # copy, offset 0x0a, len 0x2c
 
194
            # add the line different, and the trailing newline
 
195
            b'\x0adifferent\n',  # insert 10 bytes
 
196
            ])
 
197
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
 
198
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
199
 
 
200
    def test_three_nosha_delta(self):
 
201
        # The first interesting test: make a change that should use lines from
 
202
        # both parents.
 
203
        compressor = self.compressor()
 
204
        text = b'strange\ncommon very very long line\nwith some extra text\n'
 
205
        sha1_1, _, _, _ = compressor.compress(
 
206
            ('label',), [text], len(text), None)
 
207
        text = b'different\nmoredifferent\nand then some more\n'
 
208
        sha1_2, _, _, _ = compressor.compress(
 
209
            ('newlabel',), [text], len(text), None)
 
210
        expected_lines = list(compressor.chunks)
 
211
        text = (b'new\ncommon very very long line\nwith some extra text\n'
 
212
                b'different\nmoredifferent\nand then some more\n')
 
213
        sha1_3, start_point, end_point, _ = compressor.compress(
 
214
            ('label3',), [text], len(text), None)
 
215
        self.assertEqual(sha_string(text), sha1_3)
 
216
        expected_lines.extend([
 
217
            # 'delta', delta length
 
218
            b'd\x0b',
 
219
            # source and target length
 
220
            b'\x5f'
 
221
            # insert new
 
222
            b'\x03new',
 
223
            # Copy of first parent 'common' range
 
224
            b'\x91\x09\x31'  # copy, offset 0x09, 0x31 bytes
 
225
            # Copy of second parent 'different' range
 
226
            b'\x91\x3c\x2b'  # copy, offset 0x3c, 0x2b bytes
 
227
            ])
 
228
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
 
229
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
230
 
 
231
 
 
232
class TestPythonGroupCompressor(TestGroupCompressor):
 
233
 
 
234
    compressor = groupcompress.PythonGroupCompressor
 
235
 
 
236
    def test_stats(self):
 
237
        compressor = self.compressor()
 
238
        chunks = [b'strange\n',
 
239
                  b'common very very long line\n',
 
240
                  b'plus more text\n']
 
241
        compressor.compress(
 
242
            ('label',), chunks, sum(map(len, chunks)), None)
 
243
        chunks = [
 
244
            b'common very very long line\n',
 
245
            b'plus more text\n',
 
246
            b'different\n',
 
247
            b'moredifferent\n']
 
248
        compressor.compress(
 
249
            ('newlabel',), chunks, sum(map(len, chunks)), None)
 
250
        chunks = [
 
251
            b'new\n',
 
252
            b'common very very long line\n',
 
253
            b'plus more text\n',
 
254
            b'different\n',
 
255
            b'moredifferent\n']
 
256
        compressor.compress(
 
257
            ('label3',),
 
258
            chunks, sum(map(len, chunks)), None)
 
259
        self.assertAlmostEqual(1.9, compressor.ratio(), 1)
 
260
 
 
261
    def test_two_nosha_delta(self):
 
262
        compressor = self.compressor()
 
263
        text = b'strange\ncommon long line\nthat needs a 16 byte match\n'
 
264
        sha1_1, _, _, _ = compressor.compress(
 
265
            ('label',), [text], len(text), None)
 
266
        expected_lines = list(compressor.chunks)
 
267
        text = b'common long line\nthat needs a 16 byte match\ndifferent\n'
 
268
        sha1_2, start_point, end_point, _ = compressor.compress(
 
269
            ('newlabel',), [text], len(text), None)
 
270
        self.assertEqual(sha_string(text), sha1_2)
 
271
        expected_lines.extend([
 
272
            # 'delta', delta length
 
273
            b'd\x0f',
 
274
            # target length
 
275
            b'\x36',
 
276
            # copy the line common
 
277
            b'\x91\x0a\x2c',  # copy, offset 0x0a, len 0x2c
 
278
            # add the line different, and the trailing newline
 
279
            b'\x0adifferent\n',  # insert 10 bytes
 
280
            ])
 
281
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
 
282
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
283
 
 
284
    def test_three_nosha_delta(self):
 
285
        # The first interesting test: make a change that should use lines from
 
286
        # both parents.
 
287
        compressor = self.compressor()
 
288
        text = b'strange\ncommon very very long line\nwith some extra text\n'
 
289
        sha1_1, _, _, _ = compressor.compress(
 
290
            ('label',), [text], len(text), None)
 
291
        text = b'different\nmoredifferent\nand then some more\n'
 
292
        sha1_2, _, _, _ = compressor.compress(
 
293
            ('newlabel',), [text], len(text), None)
 
294
        expected_lines = list(compressor.chunks)
 
295
        text = (b'new\ncommon very very long line\nwith some extra text\n'
 
296
                b'different\nmoredifferent\nand then some more\n')
 
297
        sha1_3, start_point, end_point, _ = compressor.compress(
 
298
            ('label3',), [text], len(text), None)
 
299
        self.assertEqual(sha_string(text), sha1_3)
 
300
        expected_lines.extend([
 
301
            # 'delta', delta length
 
302
            b'd\x0c',
 
303
            # target length
 
304
            b'\x5f'
 
305
            # insert new
 
306
            b'\x04new\n',
 
307
            # Copy of first parent 'common' range
 
308
            b'\x91\x0a\x30'  # copy, offset 0x0a, 0x30 bytes
 
309
            # Copy of second parent 'different' range
 
310
            b'\x91\x3c\x2b'  # copy, offset 0x3c, 0x2b bytes
 
311
            ])
 
312
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
 
313
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
314
 
 
315
 
 
316
class TestGroupCompressBlock(tests.TestCase):
 
317
 
 
318
    def make_block(self, key_to_text):
 
319
        """Create a GroupCompressBlock, filling it with the given texts."""
 
320
        compressor = groupcompress.GroupCompressor()
 
321
        start = 0
 
322
        for key in sorted(key_to_text):
 
323
            compressor.compress(
 
324
                key, [key_to_text[key]], len(key_to_text[key]), None)
 
325
        locs = dict((key, (start, end)) for key, (start, _, end, _)
 
326
                    in compressor.labels_deltas.items())
 
327
        block = compressor.flush()
 
328
        raw_bytes = block.to_bytes()
 
329
        # Go through from_bytes(to_bytes()) so that we start with a compressed
 
330
        # content object
 
331
        return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
 
332
 
 
333
    def test_from_empty_bytes(self):
 
334
        self.assertRaises(ValueError,
 
335
                          groupcompress.GroupCompressBlock.from_bytes, b'')
 
336
 
 
337
    def test_from_minimal_bytes(self):
 
338
        block = groupcompress.GroupCompressBlock.from_bytes(
 
339
            b'gcb1z\n0\n0\n')
 
340
        self.assertIsInstance(block, groupcompress.GroupCompressBlock)
 
341
        self.assertIs(None, block._content)
 
342
        self.assertEqual(b'', block._z_content)
 
343
        block._ensure_content()
 
344
        self.assertEqual(b'', block._content)
 
345
        self.assertEqual(b'', block._z_content)
 
346
        block._ensure_content()  # Ensure content is safe to call 2x
 
347
 
 
348
    def test_from_invalid(self):
 
349
        self.assertRaises(ValueError,
 
350
                          groupcompress.GroupCompressBlock.from_bytes,
 
351
                          b'this is not a valid header')
 
352
 
 
353
    def test_from_bytes(self):
 
354
        content = (b'a tiny bit of content\n')
 
355
        z_content = zlib.compress(content)
 
356
        z_bytes = (
 
357
            b'gcb1z\n'  # group compress block v1 plain
 
358
            b'%d\n'  # Length of compressed content
 
359
            b'%d\n'  # Length of uncompressed content
 
360
            b'%s'   # Compressed content
 
361
            ) % (len(z_content), len(content), z_content)
 
362
        block = groupcompress.GroupCompressBlock.from_bytes(
 
363
            z_bytes)
 
364
        self.assertEqual(z_content, block._z_content)
 
365
        self.assertIs(None, block._content)
 
366
        self.assertEqual(len(z_content), block._z_content_length)
 
367
        self.assertEqual(len(content), block._content_length)
 
368
        block._ensure_content()
 
369
        self.assertEqual(z_content, block._z_content)
 
370
        self.assertEqual(content, block._content)
 
371
 
 
372
    def test_to_chunks(self):
 
373
        content_chunks = [b'this is some content\n',
 
374
                          b'this content will be compressed\n']
 
375
        content_len = sum(map(len, content_chunks))
 
376
        content = b''.join(content_chunks)
 
377
        gcb = groupcompress.GroupCompressBlock()
 
378
        gcb.set_chunked_content(content_chunks, content_len)
 
379
        total_len, block_chunks = gcb.to_chunks()
 
380
        block_bytes = b''.join(block_chunks)
 
381
        self.assertEqual(gcb._z_content_length, len(gcb._z_content))
 
382
        self.assertEqual(total_len, len(block_bytes))
 
383
        self.assertEqual(gcb._content_length, content_len)
 
384
        expected_header = (b'gcb1z\n'  # group compress block v1 zlib
 
385
                           b'%d\n'  # Length of compressed content
 
386
                           b'%d\n'  # Length of uncompressed content
 
387
                           ) % (gcb._z_content_length, gcb._content_length)
 
388
        # The first chunk should be the header chunk. It is small, fixed size,
 
389
        # and there is no compelling reason to split it up
 
390
        self.assertEqual(expected_header, block_chunks[0])
 
391
        self.assertStartsWith(block_bytes, expected_header)
 
392
        remaining_bytes = block_bytes[len(expected_header):]
 
393
        raw_bytes = zlib.decompress(remaining_bytes)
 
394
        self.assertEqual(content, raw_bytes)
 
395
 
 
396
    def test_to_bytes(self):
 
397
        content = (b'this is some content\n'
 
398
                   b'this content will be compressed\n')
 
399
        gcb = groupcompress.GroupCompressBlock()
 
400
        gcb.set_content(content)
 
401
        data = gcb.to_bytes()
 
402
        self.assertEqual(gcb._z_content_length, len(gcb._z_content))
 
403
        self.assertEqual(gcb._content_length, len(content))
 
404
        expected_header = (b'gcb1z\n'  # group compress block v1 zlib
 
405
                           b'%d\n'  # Length of compressed content
 
406
                           b'%d\n'  # Length of uncompressed content
 
407
                           ) % (gcb._z_content_length, gcb._content_length)
 
408
        self.assertStartsWith(data, expected_header)
 
409
        remaining_bytes = data[len(expected_header):]
 
410
        raw_bytes = zlib.decompress(remaining_bytes)
 
411
        self.assertEqual(content, raw_bytes)
 
412
 
 
413
        # we should get the same results if using the chunked version
 
414
        gcb = groupcompress.GroupCompressBlock()
 
415
        gcb.set_chunked_content([b'this is some content\n'
 
416
                                 b'this content will be compressed\n'],
 
417
                                len(content))
 
418
        old_data = data
 
419
        data = gcb.to_bytes()
 
420
        self.assertEqual(old_data, data)
 
421
 
 
422
    def test_partial_decomp(self):
 
423
        content_chunks = []
 
424
        # We need a sufficient amount of data so that zlib.decompress has
 
425
        # partial decompression to work with. Most auto-generated data
 
426
        # compresses a bit too well, we want a combination, so we combine a sha
 
427
        # hash with compressible data.
 
428
        for i in range(2048):
 
429
            next_content = b'%d\nThis is a bit of duplicate text\n' % (i,)
 
430
            content_chunks.append(next_content)
 
431
            next_sha1 = osutils.sha_string(next_content)
 
432
            content_chunks.append(next_sha1 + b'\n')
 
433
        content = b''.join(content_chunks)
 
434
        self.assertEqual(158634, len(content))
 
435
        z_content = zlib.compress(content)
 
436
        self.assertEqual(57182, len(z_content))
 
437
        block = groupcompress.GroupCompressBlock()
 
438
        block._z_content_chunks = (z_content,)
 
439
        block._z_content_length = len(z_content)
 
440
        block._compressor_name = 'zlib'
 
441
        block._content_length = 158634
 
442
        self.assertIs(None, block._content)
 
443
        block._ensure_content(100)
 
444
        self.assertIsNot(None, block._content)
 
445
        # We have decompressed at least 100 bytes
 
446
        self.assertTrue(len(block._content) >= 100)
 
447
        # We have not decompressed the whole content
 
448
        self.assertTrue(len(block._content) < 158634)
 
449
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
450
        # ensuring content that we already have shouldn't cause any more data
 
451
        # to be extracted
 
452
        cur_len = len(block._content)
 
453
        block._ensure_content(cur_len - 10)
 
454
        self.assertEqual(cur_len, len(block._content))
 
455
        # Now we want a bit more content
 
456
        cur_len += 10
 
457
        block._ensure_content(cur_len)
 
458
        self.assertTrue(len(block._content) >= cur_len)
 
459
        self.assertTrue(len(block._content) < 158634)
 
460
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
461
        # And now lets finish
 
462
        block._ensure_content(158634)
 
463
        self.assertEqualDiff(content, block._content)
 
464
        # And the decompressor is finalized
 
465
        self.assertIs(None, block._z_content_decompressor)
 
466
 
 
467
    def test__ensure_all_content(self):
 
468
        content_chunks = []
 
469
        # We need a sufficient amount of data so that zlib.decompress has
 
470
        # partial decompression to work with. Most auto-generated data
 
471
        # compresses a bit too well, we want a combination, so we combine a sha
 
472
        # hash with compressible data.
 
473
        for i in range(2048):
 
474
            next_content = b'%d\nThis is a bit of duplicate text\n' % (i,)
 
475
            content_chunks.append(next_content)
 
476
            next_sha1 = osutils.sha_string(next_content)
 
477
            content_chunks.append(next_sha1 + b'\n')
 
478
        content = b''.join(content_chunks)
 
479
        self.assertEqual(158634, len(content))
 
480
        z_content = zlib.compress(content)
 
481
        self.assertEqual(57182, len(z_content))
 
482
        block = groupcompress.GroupCompressBlock()
 
483
        block._z_content_chunks = (z_content,)
 
484
        block._z_content_length = len(z_content)
 
485
        block._compressor_name = 'zlib'
 
486
        block._content_length = 158634
 
487
        self.assertIs(None, block._content)
 
488
        # The first _ensure_content got all of the required data
 
489
        block._ensure_content(158634)
 
490
        self.assertEqualDiff(content, block._content)
 
491
        # And we should have released the _z_content_decompressor since it was
 
492
        # fully consumed
 
493
        self.assertIs(None, block._z_content_decompressor)
 
494
 
 
495
    def test__dump(self):
 
496
        dup_content = b'some duplicate content\nwhich is sufficiently long\n'
 
497
        key_to_text = {(b'1',): dup_content + b'1 unique\n',
 
498
                       (b'2',): dup_content + b'2 extra special\n'}
 
499
        locs, block = self.make_block(key_to_text)
 
500
        self.assertEqual([(b'f', len(key_to_text[(b'1',)])),
 
501
                          (b'd', 21, len(key_to_text[(b'2',)]),
 
502
                           [(b'c', 2, len(dup_content)),
 
503
                            (b'i', len(b'2 extra special\n'), b'')
 
504
                            ]),
 
505
                          ], block._dump())
 
506
 
 
507
 
 
508
class TestCaseWithGroupCompressVersionedFiles(
 
509
        tests.TestCaseWithMemoryTransport):
 
510
 
 
511
    def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
 
512
                     dir='.', inconsistency_fatal=True):
 
513
        t = self.get_transport(dir)
 
514
        t.ensure_base()
 
515
        vf = groupcompress.make_pack_factory(graph=create_graph,
 
516
                                             delta=False, keylength=keylength,
 
517
                                             inconsistency_fatal=inconsistency_fatal)(t)
 
518
        if do_cleanup:
 
519
            self.addCleanup(groupcompress.cleanup_pack_group, vf)
 
520
        return vf
 
521
 
 
522
 
 
523
class TestGroupCompressVersionedFiles(TestCaseWithGroupCompressVersionedFiles):
 
524
 
 
525
    def make_g_index(self, name, ref_lists=0, nodes=[]):
 
526
        builder = btree_index.BTreeBuilder(ref_lists)
 
527
        for node, references, value in nodes:
 
528
            builder.add_node(node, references, value)
 
529
        stream = builder.finish()
 
530
        trans = self.get_transport()
 
531
        size = trans.put_file(name, stream)
 
532
        return btree_index.BTreeGraphIndex(trans, name, size)
 
533
 
 
534
    def make_g_index_missing_parent(self):
 
535
        graph_index = self.make_g_index('missing_parent', 1,
 
536
                                        [((b'parent', ), b'2 78 2 10', ([],)),
 
537
                                         ((b'tip', ), b'2 78 2 10',
 
538
                                            ([(b'parent', ), (b'missing-parent', )],)),
 
539
                                         ])
 
540
        return graph_index
 
541
 
 
542
    def test_get_record_stream_as_requested(self):
 
543
        # Consider promoting 'as-requested' to general availability, and
 
544
        # make this a VF interface test
 
545
        vf = self.make_test_vf(False, dir='source')
 
546
        vf.add_lines((b'a',), (), [b'lines\n'])
 
547
        vf.add_lines((b'b',), (), [b'lines\n'])
 
548
        vf.add_lines((b'c',), (), [b'lines\n'])
 
549
        vf.add_lines((b'd',), (), [b'lines\n'])
 
550
        vf.writer.end()
 
551
        keys = [record.key for record in vf.get_record_stream(
 
552
            [(b'a',), (b'b',), (b'c',), (b'd',)],
 
553
            'as-requested', False)]
 
554
        self.assertEqual([(b'a',), (b'b',), (b'c',), (b'd',)], keys)
 
555
        keys = [record.key for record in vf.get_record_stream(
 
556
            [(b'b',), (b'a',), (b'd',), (b'c',)],
 
557
            'as-requested', False)]
 
558
        self.assertEqual([(b'b',), (b'a',), (b'd',), (b'c',)], keys)
 
559
 
 
560
        # It should work even after being repacked into another VF
 
561
        vf2 = self.make_test_vf(False, dir='target')
 
562
        vf2.insert_record_stream(vf.get_record_stream(
 
563
            [(b'b',), (b'a',), (b'd',), (b'c',)], 'as-requested', False))
 
564
        vf2.writer.end()
 
565
 
 
566
        keys = [record.key for record in vf2.get_record_stream(
 
567
            [(b'a',), (b'b',), (b'c',), (b'd',)],
 
568
            'as-requested', False)]
 
569
        self.assertEqual([(b'a',), (b'b',), (b'c',), (b'd',)], keys)
 
570
        keys = [record.key for record in vf2.get_record_stream(
 
571
            [(b'b',), (b'a',), (b'd',), (b'c',)],
 
572
            'as-requested', False)]
 
573
        self.assertEqual([(b'b',), (b'a',), (b'd',), (b'c',)], keys)
 
574
 
 
575
    def test_get_record_stream_max_bytes_to_index_default(self):
 
576
        vf = self.make_test_vf(True, dir='source')
 
577
        vf.add_lines((b'a',), (), [b'lines\n'])
 
578
        vf.writer.end()
 
579
        record = next(vf.get_record_stream([(b'a',)], 'unordered', True))
 
580
        self.assertEqual(vf._DEFAULT_COMPRESSOR_SETTINGS,
 
581
                         record._manager._get_compressor_settings())
 
582
 
 
583
    def test_get_record_stream_accesses_compressor_settings(self):
 
584
        vf = self.make_test_vf(True, dir='source')
 
585
        vf.add_lines((b'a',), (), [b'lines\n'])
 
586
        vf.writer.end()
 
587
        vf._max_bytes_to_index = 1234
 
588
        record = next(vf.get_record_stream([(b'a',)], 'unordered', True))
 
589
        self.assertEqual(dict(max_bytes_to_index=1234),
 
590
                         record._manager._get_compressor_settings())
 
591
 
 
592
    @staticmethod
 
593
    def grouped_stream(revision_ids, first_parents=()):
 
594
        parents = first_parents
 
595
        for revision_id in revision_ids:
 
596
            key = (revision_id,)
 
597
            record = versionedfile.FulltextContentFactory(
 
598
                key, parents, None,
 
599
                b'some content that is\n'
 
600
                b'identical except for\n'
 
601
                b'revision_id:%s\n' % (revision_id,))
 
602
            yield record
 
603
            parents = (key,)
 
604
 
 
605
    def test_insert_record_stream_reuses_blocks(self):
 
606
        vf = self.make_test_vf(True, dir='source')
 
607
        # One group, a-d
 
608
        vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
 
609
        # Second group, e-h
 
610
        vf.insert_record_stream(self.grouped_stream(
 
611
            [b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
 
612
        block_bytes = {}
 
613
        stream = vf.get_record_stream(
 
614
            [(r.encode(),) for r in 'abcdefgh'], 'unordered', False)
 
615
        num_records = 0
 
616
        for record in stream:
 
617
            if record.key in [(b'a',), (b'e',)]:
 
618
                self.assertEqual('groupcompress-block', record.storage_kind)
 
619
            else:
 
620
                self.assertEqual('groupcompress-block-ref',
 
621
                                 record.storage_kind)
 
622
            block_bytes[record.key] = record._manager._block._z_content
 
623
            num_records += 1
 
624
        self.assertEqual(8, num_records)
 
625
        for r in 'abcd':
 
626
            key = (r.encode(),)
 
627
            self.assertIs(block_bytes[key], block_bytes[(b'a',)])
 
628
            self.assertNotEqual(block_bytes[key], block_bytes[(b'e',)])
 
629
        for r in 'efgh':
 
630
            key = (r.encode(),)
 
631
            self.assertIs(block_bytes[key], block_bytes[(b'e',)])
 
632
            self.assertNotEqual(block_bytes[key], block_bytes[(b'a',)])
 
633
        # Now copy the blocks into another vf, and ensure that the blocks are
 
634
        # preserved without creating new entries
 
635
        vf2 = self.make_test_vf(True, dir='target')
 
636
        keys = [(r.encode(),) for r in 'abcdefgh']
 
637
        # ordering in 'groupcompress' order, should actually swap the groups in
 
638
        # the target vf, but the groups themselves should not be disturbed.
 
639
 
 
640
        def small_size_stream():
 
641
            for record in vf.get_record_stream(keys, 'groupcompress', False):
 
642
                record._manager._full_enough_block_size = \
 
643
                    record._manager._block._content_length
 
644
                yield record
 
645
 
 
646
        vf2.insert_record_stream(small_size_stream())
 
647
        stream = vf2.get_record_stream(keys, 'groupcompress', False)
 
648
        vf2.writer.end()
 
649
        num_records = 0
 
650
        for record in stream:
 
651
            num_records += 1
 
652
            self.assertEqual(block_bytes[record.key],
 
653
                             record._manager._block._z_content)
 
654
        self.assertEqual(8, num_records)
 
655
 
 
656
    def test_insert_record_stream_packs_on_the_fly(self):
 
657
        vf = self.make_test_vf(True, dir='source')
 
658
        # One group, a-d
 
659
        vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
 
660
        # Second group, e-h
 
661
        vf.insert_record_stream(self.grouped_stream(
 
662
            [b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
 
663
        # Now copy the blocks into another vf, and see that the
 
664
        # insert_record_stream rebuilt a new block on-the-fly because of
 
665
        # under-utilization
 
666
        vf2 = self.make_test_vf(True, dir='target')
 
667
        keys = [(r.encode(),) for r in 'abcdefgh']
 
668
        vf2.insert_record_stream(vf.get_record_stream(
 
669
            keys, 'groupcompress', False))
 
670
        stream = vf2.get_record_stream(keys, 'groupcompress', False)
 
671
        vf2.writer.end()
 
672
        num_records = 0
 
673
        # All of the records should be recombined into a single block
 
674
        block = None
 
675
        for record in stream:
 
676
            num_records += 1
 
677
            if block is None:
 
678
                block = record._manager._block
 
679
            else:
 
680
                self.assertIs(block, record._manager._block)
 
681
        self.assertEqual(8, num_records)
 
682
 
 
683
    def test__insert_record_stream_no_reuse_block(self):
 
684
        vf = self.make_test_vf(True, dir='source')
 
685
        # One group, a-d
 
686
        vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
 
687
        # Second group, e-h
 
688
        vf.insert_record_stream(self.grouped_stream(
 
689
            [b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
 
690
        vf.writer.end()
 
691
        keys = [(r.encode(),) for r in 'abcdefgh']
 
692
        self.assertEqual(8, len(list(
 
693
            vf.get_record_stream(keys, 'unordered', False))))
 
694
        # Now copy the blocks into another vf, and ensure that the blocks are
 
695
        # preserved without creating new entries
 
696
        vf2 = self.make_test_vf(True, dir='target')
 
697
        # ordering in 'groupcompress' order, should actually swap the groups in
 
698
        # the target vf, but the groups themselves should not be disturbed.
 
699
        list(vf2._insert_record_stream(vf.get_record_stream(
 
700
            keys, 'groupcompress', False),
 
701
            reuse_blocks=False))
 
702
        vf2.writer.end()
 
703
        # After inserting with reuse_blocks=False, we should have everything in
 
704
        # a single new block.
 
705
        stream = vf2.get_record_stream(keys, 'groupcompress', False)
 
706
        block = None
 
707
        for record in stream:
 
708
            if block is None:
 
709
                block = record._manager._block
 
710
            else:
 
711
                self.assertIs(block, record._manager._block)
 
712
 
 
713
    def test_add_missing_noncompression_parent_unvalidated_index(self):
 
714
        unvalidated = self.make_g_index_missing_parent()
 
715
        combined = _mod_index.CombinedGraphIndex([unvalidated])
 
716
        index = groupcompress._GCGraphIndex(combined,
 
717
                                            is_locked=lambda: True, parents=True,
 
718
                                            track_external_parent_refs=True)
 
719
        index.scan_unvalidated_index(unvalidated)
 
720
        self.assertEqual(
 
721
            frozenset([(b'missing-parent',)]), index.get_missing_parents())
 
722
 
 
723
    def test_track_external_parent_refs(self):
 
724
        g_index = self.make_g_index('empty', 1, [])
 
725
        mod_index = btree_index.BTreeBuilder(1, 1)
 
726
        combined = _mod_index.CombinedGraphIndex([g_index, mod_index])
 
727
        index = groupcompress._GCGraphIndex(combined,
 
728
                                            is_locked=lambda: True, parents=True,
 
729
                                            add_callback=mod_index.add_nodes,
 
730
                                            track_external_parent_refs=True)
 
731
        index.add_records([
 
732
            ((b'new-key',), b'2 10 2 10', [((b'parent-1',), (b'parent-2',))])])
 
733
        self.assertEqual(
 
734
            frozenset([(b'parent-1',), (b'parent-2',)]),
 
735
            index.get_missing_parents())
 
736
 
 
737
    def make_source_with_b(self, a_parent, path):
 
738
        source = self.make_test_vf(True, dir=path)
 
739
        source.add_lines((b'a',), (), [b'lines\n'])
 
740
        if a_parent:
 
741
            b_parents = ((b'a',),)
 
742
        else:
 
743
            b_parents = ()
 
744
        source.add_lines((b'b',), b_parents, [b'lines\n'])
 
745
        return source
 
746
 
 
747
    def do_inconsistent_inserts(self, inconsistency_fatal):
 
748
        target = self.make_test_vf(True, dir='target',
 
749
                                   inconsistency_fatal=inconsistency_fatal)
 
750
        for x in range(2):
 
751
            source = self.make_source_with_b(x == 1, 'source%s' % x)
 
752
            target.insert_record_stream(source.get_record_stream(
 
753
                [(b'b',)], 'unordered', False))
 
754
 
 
755
    def test_inconsistent_redundant_inserts_warn(self):
 
756
        """Should not insert a record that is already present."""
 
757
        warnings = []
 
758
 
 
759
        def warning(template, args):
 
760
            warnings.append(template % args)
 
761
        _trace_warning = trace.warning
 
762
        trace.warning = warning
 
763
        try:
 
764
            self.do_inconsistent_inserts(inconsistency_fatal=False)
 
765
        finally:
 
766
            trace.warning = _trace_warning
 
767
        self.assertContainsRe(
 
768
            "\n".join(warnings),
 
769
            r"^inconsistent details in skipped record: \(b?'b',\)"
 
770
            r" \(b?'42 32 0 8', \(\(\),\)\)"
 
771
            r" \(b?'74 32 0 8', \(\(\(b?'a',\),\),\)\)$")
 
772
 
 
773
    def test_inconsistent_redundant_inserts_raises(self):
 
774
        e = self.assertRaises(knit.KnitCorrupt, self.do_inconsistent_inserts,
 
775
                              inconsistency_fatal=True)
 
776
        self.assertContainsRe(str(e), r"Knit.* corrupt: inconsistent details"
 
777
                              r" in add_records:"
 
778
                              r" \(b?'b',\) \(b?'42 32 0 8', \(\(\),\)\)"
 
779
                              r" \(b?'74 32 0 8', \(\(\(b?'a',\),\),\)\)")
 
780
 
 
781
    def test_clear_cache(self):
 
782
        vf = self.make_source_with_b(True, 'source')
 
783
        vf.writer.end()
 
784
        for record in vf.get_record_stream([(b'a',), (b'b',)], 'unordered',
 
785
                                           True):
 
786
            pass
 
787
        self.assertTrue(len(vf._group_cache) > 0)
 
788
        vf.clear_cache()
 
789
        self.assertEqual(0, len(vf._group_cache))
 
790
 
 
791
 
 
792
class TestGroupCompressConfig(tests.TestCaseWithTransport):
 
793
 
 
794
    def make_test_vf(self):
 
795
        t = self.get_transport('.')
 
796
        t.ensure_base()
 
797
        factory = groupcompress.make_pack_factory(graph=True,
 
798
                                                  delta=False, keylength=1, inconsistency_fatal=True)
 
799
        vf = factory(t)
 
800
        self.addCleanup(groupcompress.cleanup_pack_group, vf)
 
801
        return vf
 
802
 
 
803
    def test_max_bytes_to_index_default(self):
 
804
        vf = self.make_test_vf()
 
805
        gc = vf._make_group_compressor()
 
806
        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
807
                         vf._max_bytes_to_index)
 
808
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
809
            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
810
                             gc._delta_index._max_bytes_to_index)
 
811
 
 
812
    def test_max_bytes_to_index_in_config(self):
 
813
        c = config.GlobalConfig()
 
814
        c.set_user_option('bzr.groupcompress.max_bytes_to_index', '10000')
 
815
        vf = self.make_test_vf()
 
816
        gc = vf._make_group_compressor()
 
817
        self.assertEqual(10000, vf._max_bytes_to_index)
 
818
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
819
            self.assertEqual(10000, gc._delta_index._max_bytes_to_index)
 
820
 
 
821
    def test_max_bytes_to_index_bad_config(self):
 
822
        c = config.GlobalConfig()
 
823
        c.set_user_option('bzr.groupcompress.max_bytes_to_index', 'boogah')
 
824
        vf = self.make_test_vf()
 
825
        # TODO: This is triggering a warning, we might want to trap and make
 
826
        #       sure it is readable.
 
827
        gc = vf._make_group_compressor()
 
828
        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
829
                         vf._max_bytes_to_index)
 
830
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
831
            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
832
                             gc._delta_index._max_bytes_to_index)
 
833
 
 
834
 
 
835
class StubGCVF(object):
 
836
    def __init__(self, canned_get_blocks=None):
 
837
        self._group_cache = {}
 
838
        self._canned_get_blocks = canned_get_blocks or []
 
839
 
 
840
    def _get_blocks(self, read_memos):
 
841
        return iter(self._canned_get_blocks)
 
842
 
 
843
 
 
844
class Test_BatchingBlockFetcher(TestCaseWithGroupCompressVersionedFiles):
 
845
    """Simple whitebox unit tests for _BatchingBlockFetcher."""
 
846
 
 
847
    def test_add_key_new_read_memo(self):
 
848
        """Adding a key with an uncached read_memo new to this batch adds that
 
849
        read_memo to the list of memos to fetch.
 
850
        """
 
851
        # locations are: index_memo, ignored, parents, ignored
 
852
        # where index_memo is: (idx, offset, len, factory_start, factory_end)
 
853
        # and (idx, offset, size) is known as the 'read_memo', identifying the
 
854
        # raw bytes needed.
 
855
        read_memo = ('fake index', 100, 50)
 
856
        locations = {
 
857
            ('key',): (read_memo + (None, None), None, None, None)}
 
858
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
 
859
        total_size = batcher.add_key(('key',))
 
860
        self.assertEqual(50, total_size)
 
861
        self.assertEqual([('key',)], batcher.keys)
 
862
        self.assertEqual([read_memo], batcher.memos_to_get)
 
863
 
 
864
    def test_add_key_duplicate_read_memo(self):
 
865
        """read_memos that occur multiple times in a batch will only be fetched
 
866
        once.
 
867
        """
 
868
        read_memo = ('fake index', 100, 50)
 
869
        # Two keys, both sharing the same read memo (but different overall
 
870
        # index_memos).
 
871
        locations = {
 
872
            ('key1',): (read_memo + (0, 1), None, None, None),
 
873
            ('key2',): (read_memo + (1, 2), None, None, None)}
 
874
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
 
875
        total_size = batcher.add_key(('key1',))
 
876
        total_size = batcher.add_key(('key2',))
 
877
        self.assertEqual(50, total_size)
 
878
        self.assertEqual([('key1',), ('key2',)], batcher.keys)
 
879
        self.assertEqual([read_memo], batcher.memos_to_get)
 
880
 
 
881
    def test_add_key_cached_read_memo(self):
 
882
        """Adding a key with a cached read_memo will not cause that read_memo
 
883
        to be added to the list to fetch.
 
884
        """
 
885
        read_memo = ('fake index', 100, 50)
 
886
        gcvf = StubGCVF()
 
887
        gcvf._group_cache[read_memo] = 'fake block'
 
888
        locations = {
 
889
            ('key',): (read_memo + (None, None), None, None, None)}
 
890
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
 
891
        total_size = batcher.add_key(('key',))
 
892
        self.assertEqual(0, total_size)
 
893
        self.assertEqual([('key',)], batcher.keys)
 
894
        self.assertEqual([], batcher.memos_to_get)
 
895
 
 
896
    def test_yield_factories_empty(self):
 
897
        """An empty batch yields no factories."""
 
898
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), {})
 
899
        self.assertEqual([], list(batcher.yield_factories()))
 
900
 
 
901
    def test_yield_factories_calls_get_blocks(self):
 
902
        """Uncached memos are retrieved via get_blocks."""
 
903
        read_memo1 = ('fake index', 100, 50)
 
904
        read_memo2 = ('fake index', 150, 40)
 
905
        gcvf = StubGCVF(
 
906
            canned_get_blocks=[
 
907
                (read_memo1, groupcompress.GroupCompressBlock()),
 
908
                (read_memo2, groupcompress.GroupCompressBlock())])
 
909
        locations = {
 
910
            ('key1',): (read_memo1 + (0, 0), None, None, None),
 
911
            ('key2',): (read_memo2 + (0, 0), None, None, None)}
 
912
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
 
913
        batcher.add_key(('key1',))
 
914
        batcher.add_key(('key2',))
 
915
        factories = list(batcher.yield_factories(full_flush=True))
 
916
        self.assertLength(2, factories)
 
917
        keys = [f.key for f in factories]
 
918
        kinds = [f.storage_kind for f in factories]
 
919
        self.assertEqual([('key1',), ('key2',)], keys)
 
920
        self.assertEqual(['groupcompress-block', 'groupcompress-block'], kinds)
 
921
 
 
922
    def test_yield_factories_flushing(self):
 
923
        """yield_factories holds back on yielding results from the final block
 
924
        unless passed full_flush=True.
 
925
        """
 
926
        fake_block = groupcompress.GroupCompressBlock()
 
927
        read_memo = ('fake index', 100, 50)
 
928
        gcvf = StubGCVF()
 
929
        gcvf._group_cache[read_memo] = fake_block
 
930
        locations = {
 
931
            ('key',): (read_memo + (0, 0), None, None, None)}
 
932
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
 
933
        batcher.add_key(('key',))
 
934
        self.assertEqual([], list(batcher.yield_factories()))
 
935
        factories = list(batcher.yield_factories(full_flush=True))
 
936
        self.assertLength(1, factories)
 
937
        self.assertEqual(('key',), factories[0].key)
 
938
        self.assertEqual('groupcompress-block', factories[0].storage_kind)
 
939
 
 
940
 
 
941
class TestLazyGroupCompress(tests.TestCaseWithTransport):
 
942
 
 
943
    _texts = {
 
944
        (b'key1',): b"this is a text\n"
 
945
        b"with a reasonable amount of compressible bytes\n"
 
946
        b"which can be shared between various other texts\n",
 
947
        (b'key2',): b"another text\n"
 
948
        b"with a reasonable amount of compressible bytes\n"
 
949
        b"which can be shared between various other texts\n",
 
950
        (b'key3',): b"yet another text which won't be extracted\n"
 
951
        b"with a reasonable amount of compressible bytes\n"
 
952
        b"which can be shared between various other texts\n",
 
953
        (b'key4',): b"this will be extracted\n"
 
954
        b"but references most of its bytes from\n"
 
955
        b"yet another text which won't be extracted\n"
 
956
        b"with a reasonable amount of compressible bytes\n"
 
957
        b"which can be shared between various other texts\n",
 
958
    }
 
959
 
 
960
    def make_block(self, key_to_text):
 
961
        """Create a GroupCompressBlock, filling it with the given texts."""
 
962
        compressor = groupcompress.GroupCompressor()
 
963
        start = 0
 
964
        for key in sorted(key_to_text):
 
965
            compressor.compress(
 
966
                key, [key_to_text[key]], len(key_to_text[key]), None)
 
967
        locs = dict((key, (start, end)) for key, (start, _, end, _)
 
968
                    in compressor.labels_deltas.items())
 
969
        block = compressor.flush()
 
970
        raw_bytes = block.to_bytes()
 
971
        return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
 
972
 
 
973
    def add_key_to_manager(self, key, locations, block, manager):
 
974
        start, end = locations[key]
 
975
        manager.add_factory(key, (), start, end)
 
976
 
 
977
    def make_block_and_full_manager(self, texts):
 
978
        locations, block = self.make_block(texts)
 
979
        manager = groupcompress._LazyGroupContentManager(block)
 
980
        for key in sorted(texts):
 
981
            self.add_key_to_manager(key, locations, block, manager)
 
982
        return block, manager
 
983
 
 
984
    def test_get_fulltexts(self):
 
985
        locations, block = self.make_block(self._texts)
 
986
        manager = groupcompress._LazyGroupContentManager(block)
 
987
        self.add_key_to_manager((b'key1',), locations, block, manager)
 
988
        self.add_key_to_manager((b'key2',), locations, block, manager)
 
989
        result_order = []
 
990
        for record in manager.get_record_stream():
 
991
            result_order.append(record.key)
 
992
            text = self._texts[record.key]
 
993
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
994
        self.assertEqual([(b'key1',), (b'key2',)], result_order)
 
995
 
 
996
        # If we build the manager in the opposite order, we should get them
 
997
        # back in the opposite order
 
998
        manager = groupcompress._LazyGroupContentManager(block)
 
999
        self.add_key_to_manager((b'key2',), locations, block, manager)
 
1000
        self.add_key_to_manager((b'key1',), locations, block, manager)
 
1001
        result_order = []
 
1002
        for record in manager.get_record_stream():
 
1003
            result_order.append(record.key)
 
1004
            text = self._texts[record.key]
 
1005
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
1006
        self.assertEqual([(b'key2',), (b'key1',)], result_order)
 
1007
 
 
1008
    def test__wire_bytes_no_keys(self):
 
1009
        locations, block = self.make_block(self._texts)
 
1010
        manager = groupcompress._LazyGroupContentManager(block)
 
1011
        wire_bytes = manager._wire_bytes()
 
1012
        block_length = len(block.to_bytes())
 
1013
        # We should have triggered a strip, since we aren't using any content
 
1014
        stripped_block = manager._block.to_bytes()
 
1015
        self.assertTrue(block_length > len(stripped_block))
 
1016
        empty_z_header = zlib.compress(b'')
 
1017
        self.assertEqual(b'groupcompress-block\n'
 
1018
                         b'8\n'  # len(compress(''))
 
1019
                         b'0\n'  # len('')
 
1020
                         b'%d\n'  # compressed block len
 
1021
                         b'%s'  # zheader
 
1022
                         b'%s'  # block
 
1023
                         % (len(stripped_block), empty_z_header,
 
1024
                            stripped_block),
 
1025
                         wire_bytes)
 
1026
 
 
1027
    def test__wire_bytes(self):
 
1028
        locations, block = self.make_block(self._texts)
 
1029
        manager = groupcompress._LazyGroupContentManager(block)
 
1030
        self.add_key_to_manager((b'key1',), locations, block, manager)
 
1031
        self.add_key_to_manager((b'key4',), locations, block, manager)
 
1032
        block_bytes = block.to_bytes()
 
1033
        wire_bytes = manager._wire_bytes()
 
1034
        (storage_kind, z_header_len, header_len,
 
1035
         block_len, rest) = wire_bytes.split(b'\n', 4)
 
1036
        z_header_len = int(z_header_len)
 
1037
        header_len = int(header_len)
 
1038
        block_len = int(block_len)
 
1039
        self.assertEqual(b'groupcompress-block', storage_kind)
 
1040
        self.assertEqual(34, z_header_len)
 
1041
        self.assertEqual(26, header_len)
 
1042
        self.assertEqual(len(block_bytes), block_len)
 
1043
        z_header = rest[:z_header_len]
 
1044
        header = zlib.decompress(z_header)
 
1045
        self.assertEqual(header_len, len(header))
 
1046
        entry1 = locations[(b'key1',)]
 
1047
        entry4 = locations[(b'key4',)]
 
1048
        self.assertEqualDiff(b'key1\n'
 
1049
                             b'\n'  # no parents
 
1050
                             b'%d\n'  # start offset
 
1051
                             b'%d\n'  # end offset
 
1052
                             b'key4\n'
 
1053
                             b'\n'
 
1054
                             b'%d\n'
 
1055
                             b'%d\n'
 
1056
                             % (entry1[0], entry1[1],
 
1057
                                entry4[0], entry4[1]),
 
1058
                             header)
 
1059
        z_block = rest[z_header_len:]
 
1060
        self.assertEqual(block_bytes, z_block)
 
1061
 
 
1062
    def test_from_bytes(self):
 
1063
        locations, block = self.make_block(self._texts)
 
1064
        manager = groupcompress._LazyGroupContentManager(block)
 
1065
        self.add_key_to_manager((b'key1',), locations, block, manager)
 
1066
        self.add_key_to_manager((b'key4',), locations, block, manager)
 
1067
        wire_bytes = manager._wire_bytes()
 
1068
        self.assertStartsWith(wire_bytes, b'groupcompress-block\n')
 
1069
        manager = groupcompress._LazyGroupContentManager.from_bytes(wire_bytes)
 
1070
        self.assertIsInstance(manager, groupcompress._LazyGroupContentManager)
 
1071
        self.assertEqual(2, len(manager._factories))
 
1072
        self.assertEqual(block._z_content, manager._block._z_content)
 
1073
        result_order = []
 
1074
        for record in manager.get_record_stream():
 
1075
            result_order.append(record.key)
 
1076
            text = self._texts[record.key]
 
1077
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
1078
        self.assertEqual([(b'key1',), (b'key4',)], result_order)
 
1079
 
 
1080
    def test__check_rebuild_no_changes(self):
 
1081
        block, manager = self.make_block_and_full_manager(self._texts)
 
1082
        manager._check_rebuild_block()
 
1083
        self.assertIs(block, manager._block)
 
1084
 
 
1085
    def test__check_rebuild_only_one(self):
 
1086
        locations, block = self.make_block(self._texts)
 
1087
        manager = groupcompress._LazyGroupContentManager(block)
 
1088
        # Request just the first key, which should trigger a 'strip' action
 
1089
        self.add_key_to_manager((b'key1',), locations, block, manager)
 
1090
        manager._check_rebuild_block()
 
1091
        self.assertIsNot(block, manager._block)
 
1092
        self.assertTrue(block._content_length > manager._block._content_length)
 
1093
        # We should be able to still get the content out of this block, though
 
1094
        # it should only have 1 entry
 
1095
        for record in manager.get_record_stream():
 
1096
            self.assertEqual((b'key1',), record.key)
 
1097
            self.assertEqual(self._texts[record.key],
 
1098
                             record.get_bytes_as('fulltext'))
 
1099
 
 
1100
    def test__check_rebuild_middle(self):
 
1101
        locations, block = self.make_block(self._texts)
 
1102
        manager = groupcompress._LazyGroupContentManager(block)
 
1103
        # Request a small key in the middle should trigger a 'rebuild'
 
1104
        self.add_key_to_manager((b'key4',), locations, block, manager)
 
1105
        manager._check_rebuild_block()
 
1106
        self.assertIsNot(block, manager._block)
 
1107
        self.assertTrue(block._content_length > manager._block._content_length)
 
1108
        for record in manager.get_record_stream():
 
1109
            self.assertEqual((b'key4',), record.key)
 
1110
            self.assertEqual(self._texts[record.key],
 
1111
                             record.get_bytes_as('fulltext'))
 
1112
 
 
1113
    def test_manager_default_compressor_settings(self):
 
1114
        locations, old_block = self.make_block(self._texts)
 
1115
        manager = groupcompress._LazyGroupContentManager(old_block)
 
1116
        gcvf = groupcompress.GroupCompressVersionedFiles
 
1117
        # It doesn't greedily evaluate _max_bytes_to_index
 
1118
        self.assertIs(None, manager._compressor_settings)
 
1119
        self.assertEqual(gcvf._DEFAULT_COMPRESSOR_SETTINGS,
 
1120
                         manager._get_compressor_settings())
 
1121
 
 
1122
    def test_manager_custom_compressor_settings(self):
 
1123
        locations, old_block = self.make_block(self._texts)
 
1124
        called = []
 
1125
 
 
1126
        def compressor_settings():
 
1127
            called.append('called')
 
1128
            return (10,)
 
1129
        manager = groupcompress._LazyGroupContentManager(old_block,
 
1130
                                                         get_compressor_settings=compressor_settings)
 
1131
        gcvf = groupcompress.GroupCompressVersionedFiles
 
1132
        # It doesn't greedily evaluate compressor_settings
 
1133
        self.assertIs(None, manager._compressor_settings)
 
1134
        self.assertEqual((10,), manager._get_compressor_settings())
 
1135
        self.assertEqual((10,), manager._get_compressor_settings())
 
1136
        self.assertEqual((10,), manager._compressor_settings)
 
1137
        # Only called 1 time
 
1138
        self.assertEqual(['called'], called)
 
1139
 
 
1140
    def test__rebuild_handles_compressor_settings(self):
 
1141
        if not isinstance(groupcompress.GroupCompressor,
 
1142
                          groupcompress.PyrexGroupCompressor):
 
1143
            raise tests.TestNotApplicable('pure-python compressor'
 
1144
                                          ' does not handle compressor_settings')
 
1145
        locations, old_block = self.make_block(self._texts)
 
1146
        manager = groupcompress._LazyGroupContentManager(old_block,
 
1147
                                                         get_compressor_settings=lambda: dict(max_bytes_to_index=32))
 
1148
        gc = manager._make_group_compressor()
 
1149
        self.assertEqual(32, gc._delta_index._max_bytes_to_index)
 
1150
        self.add_key_to_manager((b'key3',), locations, old_block, manager)
 
1151
        self.add_key_to_manager((b'key4',), locations, old_block, manager)
 
1152
        action, last_byte, total_bytes = manager._check_rebuild_action()
 
1153
        self.assertEqual('rebuild', action)
 
1154
        manager._rebuild_block()
 
1155
        new_block = manager._block
 
1156
        self.assertIsNot(old_block, new_block)
 
1157
        # Because of the new max_bytes_to_index, we do a poor job of
 
1158
        # rebuilding. This is a side-effect of the change, but at least it does
 
1159
        # show the setting had an effect.
 
1160
        self.assertTrue(old_block._content_length < new_block._content_length)
 
1161
 
 
1162
    def test_check_is_well_utilized_all_keys(self):
 
1163
        block, manager = self.make_block_and_full_manager(self._texts)
 
1164
        self.assertFalse(manager.check_is_well_utilized())
 
1165
        # Though we can fake it by changing the recommended minimum size
 
1166
        manager._full_enough_block_size = block._content_length
 
1167
        self.assertTrue(manager.check_is_well_utilized())
 
1168
        # Setting it just above causes it to fail
 
1169
        manager._full_enough_block_size = block._content_length + 1
 
1170
        self.assertFalse(manager.check_is_well_utilized())
 
1171
        # Setting the mixed-block size doesn't do anything, because the content
 
1172
        # is considered to not be 'mixed'
 
1173
        manager._full_enough_mixed_block_size = block._content_length
 
1174
        self.assertFalse(manager.check_is_well_utilized())
 
1175
 
 
1176
    def test_check_is_well_utilized_mixed_keys(self):
 
1177
        texts = {}
 
1178
        f1k1 = (b'f1', b'k1')
 
1179
        f1k2 = (b'f1', b'k2')
 
1180
        f2k1 = (b'f2', b'k1')
 
1181
        f2k2 = (b'f2', b'k2')
 
1182
        texts[f1k1] = self._texts[(b'key1',)]
 
1183
        texts[f1k2] = self._texts[(b'key2',)]
 
1184
        texts[f2k1] = self._texts[(b'key3',)]
 
1185
        texts[f2k2] = self._texts[(b'key4',)]
 
1186
        block, manager = self.make_block_and_full_manager(texts)
 
1187
        self.assertFalse(manager.check_is_well_utilized())
 
1188
        manager._full_enough_block_size = block._content_length
 
1189
        self.assertTrue(manager.check_is_well_utilized())
 
1190
        manager._full_enough_block_size = block._content_length + 1
 
1191
        self.assertFalse(manager.check_is_well_utilized())
 
1192
        manager._full_enough_mixed_block_size = block._content_length
 
1193
        self.assertTrue(manager.check_is_well_utilized())
 
1194
 
 
1195
    def test_check_is_well_utilized_partial_use(self):
 
1196
        locations, block = self.make_block(self._texts)
 
1197
        manager = groupcompress._LazyGroupContentManager(block)
 
1198
        manager._full_enough_block_size = block._content_length
 
1199
        self.add_key_to_manager((b'key1',), locations, block, manager)
 
1200
        self.add_key_to_manager((b'key2',), locations, block, manager)
 
1201
        # Just using the content from key1 and 2 is not enough to be considered
 
1202
        # 'complete'
 
1203
        self.assertFalse(manager.check_is_well_utilized())
 
1204
        # However if we add key3, then we have enough, as we only require 75%
 
1205
        # consumption
 
1206
        self.add_key_to_manager((b'key4',), locations, block, manager)
 
1207
        self.assertTrue(manager.check_is_well_utilized())
 
1208
 
 
1209
 
 
1210
class Test_GCBuildDetails(tests.TestCase):
 
1211
 
 
1212
    def test_acts_like_tuple(self):
 
1213
        # _GCBuildDetails inlines some of the data that used to be spread out
 
1214
        # across a bunch of tuples
 
1215
        bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
 
1216
                                           ('INDEX', 10, 20, 0, 5))
 
1217
        self.assertEqual(4, len(bd))
 
1218
        self.assertEqual(('INDEX', 10, 20, 0, 5), bd[0])
 
1219
        self.assertEqual(None, bd[1])  # Compression Parent is always None
 
1220
        self.assertEqual((('parent1',), ('parent2',)), bd[2])
 
1221
        self.assertEqual(('group', None), bd[3])  # Record details
 
1222
 
 
1223
    def test__repr__(self):
 
1224
        bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
 
1225
                                           ('INDEX', 10, 20, 0, 5))
 
1226
        self.assertEqual("_GCBuildDetails(('INDEX', 10, 20, 0, 5),"
 
1227
                         " (('parent1',), ('parent2',)))",
 
1228
                         repr(bd))