1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for group compression."""
35
from ..osutils import sha_string
36
from .test__groupcompress import compiled_groupcompress_feature
37
from .scenarios import load_tests_apply_scenarios
40
def group_compress_implementation_scenarios():
42
('python', {'compressor': groupcompress.PythonGroupCompressor}),
44
if compiled_groupcompress_feature.available():
45
scenarios.append(('C',
46
{'compressor': groupcompress.PyrexGroupCompressor}))
50
load_tests = load_tests_apply_scenarios
53
class TestGroupCompressor(tests.TestCase):
55
def _chunks_to_repr_lines(self, chunks):
56
return '\n'.join(map(repr, b''.join(chunks).split(b'\n')))
58
def assertEqualDiffEncoded(self, expected, actual):
59
"""Compare the actual content to the expected content.
61
:param expected: A group of chunks that we expect to see
62
:param actual: The measured 'chunks'
64
We will transform the chunks back into lines, and then run 'repr()'
65
over them to handle non-ascii characters.
67
self.assertEqualDiff(self._chunks_to_repr_lines(expected),
68
self._chunks_to_repr_lines(actual))
71
class TestAllGroupCompressors(TestGroupCompressor):
72
"""Tests for GroupCompressor"""
74
scenarios = group_compress_implementation_scenarios()
75
compressor = None # Set by scenario
77
def test_empty_delta(self):
78
compressor = self.compressor()
79
self.assertEqual([], compressor.chunks)
81
def test_one_nosha_delta(self):
83
compressor = self.compressor()
84
sha1, start_point, end_point, _ = compressor.compress(('label',),
85
b'strange\ncommon\n', None)
86
self.assertEqual(sha_string(b'strange\ncommon\n'), sha1)
87
expected_lines = b'f\x0fstrange\ncommon\n'
88
self.assertEqual(expected_lines, b''.join(compressor.chunks))
89
self.assertEqual(0, start_point)
90
self.assertEqual(len(expected_lines), end_point)
92
def test_empty_content(self):
93
compressor = self.compressor()
94
# Adding empty bytes should return the 'null' record
95
sha1, start_point, end_point, kind = compressor.compress(('empty',),
97
self.assertEqual(0, start_point)
98
self.assertEqual(0, end_point)
99
self.assertEqual('fulltext', kind)
100
self.assertEqual(groupcompress._null_sha1, sha1)
101
self.assertEqual(0, compressor.endpoint)
102
self.assertEqual([], compressor.chunks)
103
# Even after adding some content
104
compressor.compress(('content',), b'some\nbytes\n', None)
105
self.assertTrue(compressor.endpoint > 0)
106
sha1, start_point, end_point, kind = compressor.compress(('empty2',),
108
self.assertEqual(0, start_point)
109
self.assertEqual(0, end_point)
110
self.assertEqual('fulltext', kind)
111
self.assertEqual(groupcompress._null_sha1, sha1)
113
def test_extract_from_compressor(self):
114
# Knit fetching will try to reconstruct texts locally which results in
115
# reading something that is in the compressor stream already.
116
compressor = self.compressor()
117
sha1_1, _, _, _ = compressor.compress(('label',),
118
b'strange\ncommon long line\nthat needs a 16 byte match\n', None)
119
expected_lines = list(compressor.chunks)
120
sha1_2, _, end_point, _ = compressor.compress(('newlabel',),
121
b'common long line\nthat needs a 16 byte match\ndifferent\n', None)
123
self.assertEqual((b'strange\ncommon long line\n'
124
b'that needs a 16 byte match\n', sha1_1),
125
compressor.extract(('label',)))
127
self.assertEqual((b'common long line\nthat needs a 16 byte match\n'
128
b'different\n', sha1_2),
129
compressor.extract(('newlabel',)))
131
def test_pop_last(self):
132
compressor = self.compressor()
133
_, _, _, _ = compressor.compress(('key1',),
134
b'some text\nfor the first entry\n', None)
135
expected_lines = list(compressor.chunks)
136
_, _, _, _ = compressor.compress(('key2',),
137
b'some text\nfor the second entry\n', None)
138
compressor.pop_last()
139
self.assertEqual(expected_lines, compressor.chunks)
142
class TestPyrexGroupCompressor(TestGroupCompressor):
144
_test_needs_features = [compiled_groupcompress_feature]
145
compressor = groupcompress.PyrexGroupCompressor
147
def test_stats(self):
148
compressor = self.compressor()
149
compressor.compress(('label',),
151
b'common very very long line\n'
152
b'plus more text\n', None)
153
compressor.compress(('newlabel',),
154
b'common very very long line\n'
157
b'moredifferent\n', None)
158
compressor.compress(('label3',),
160
b'common very very long line\n'
163
b'moredifferent\n', None)
164
self.assertAlmostEqual(1.9, compressor.ratio(), 1)
166
def test_two_nosha_delta(self):
167
compressor = self.compressor()
168
sha1_1, _, _, _ = compressor.compress(('label',),
169
b'strange\ncommon long line\nthat needs a 16 byte match\n', None)
170
expected_lines = list(compressor.chunks)
171
sha1_2, start_point, end_point, _ = compressor.compress(('newlabel',),
172
b'common long line\nthat needs a 16 byte match\ndifferent\n', None)
173
self.assertEqual(sha_string(b'common long line\n'
174
b'that needs a 16 byte match\n'
175
b'different\n'), sha1_2)
176
expected_lines.extend([
177
# 'delta', delta length
179
# source and target length
181
# copy the line common
182
b'\x91\x0a\x2c', # copy, offset 0x0a, len 0x2c
183
# add the line different, and the trailing newline
184
b'\x0adifferent\n', # insert 10 bytes
186
self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
187
self.assertEqual(sum(map(len, expected_lines)), end_point)
189
def test_three_nosha_delta(self):
190
# The first interesting test: make a change that should use lines from
192
compressor = self.compressor()
193
sha1_1, _, _, _ = compressor.compress(('label',),
194
b'strange\ncommon very very long line\nwith some extra text\n', None)
195
sha1_2, _, _, _ = compressor.compress(('newlabel',),
196
b'different\nmoredifferent\nand then some more\n', None)
197
expected_lines = list(compressor.chunks)
198
sha1_3, start_point, end_point, _ = compressor.compress(('label3',),
199
b'new\ncommon very very long line\nwith some extra text\n'
200
b'different\nmoredifferent\nand then some more\n',
203
sha_string(b'new\ncommon very very long line\nwith some extra text\n'
204
b'different\nmoredifferent\nand then some more\n'),
206
expected_lines.extend([
207
# 'delta', delta length
209
# source and target length
213
# Copy of first parent 'common' range
214
b'\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
215
# Copy of second parent 'different' range
216
b'\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
218
self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
219
self.assertEqual(sum(map(len, expected_lines)), end_point)
222
class TestPythonGroupCompressor(TestGroupCompressor):
224
compressor = groupcompress.PythonGroupCompressor
226
def test_stats(self):
227
compressor = self.compressor()
228
compressor.compress(('label',),
230
b'common very very long line\n'
231
b'plus more text\n', None)
232
compressor.compress(('newlabel',),
233
b'common very very long line\n'
236
b'moredifferent\n', None)
237
compressor.compress(('label3',),
239
b'common very very long line\n'
242
b'moredifferent\n', None)
243
self.assertAlmostEqual(1.9, compressor.ratio(), 1)
245
def test_two_nosha_delta(self):
246
compressor = self.compressor()
247
sha1_1, _, _, _ = compressor.compress(('label',),
248
b'strange\ncommon long line\nthat needs a 16 byte match\n', None)
249
expected_lines = list(compressor.chunks)
250
sha1_2, start_point, end_point, _ = compressor.compress(('newlabel',),
251
b'common long line\nthat needs a 16 byte match\ndifferent\n', None)
252
self.assertEqual(sha_string(b'common long line\n'
253
b'that needs a 16 byte match\n'
254
b'different\n'), sha1_2)
255
expected_lines.extend([
256
# 'delta', delta length
260
# copy the line common
261
b'\x91\x0a\x2c', # copy, offset 0x0a, len 0x2c
262
# add the line different, and the trailing newline
263
b'\x0adifferent\n', # insert 10 bytes
265
self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
266
self.assertEqual(sum(map(len, expected_lines)), end_point)
268
def test_three_nosha_delta(self):
269
# The first interesting test: make a change that should use lines from
271
compressor = self.compressor()
272
sha1_1, _, _, _ = compressor.compress(('label',),
273
b'strange\ncommon very very long line\nwith some extra text\n', None)
274
sha1_2, _, _, _ = compressor.compress(('newlabel',),
275
b'different\nmoredifferent\nand then some more\n', None)
276
expected_lines = list(compressor.chunks)
277
sha1_3, start_point, end_point, _ = compressor.compress(('label3',),
278
b'new\ncommon very very long line\nwith some extra text\n'
279
b'different\nmoredifferent\nand then some more\n',
282
sha_string(b'new\ncommon very very long line\nwith some extra text\n'
283
b'different\nmoredifferent\nand then some more\n'),
285
expected_lines.extend([
286
# 'delta', delta length
292
# Copy of first parent 'common' range
293
b'\x91\x0a\x30' # copy, offset 0x0a, 0x30 bytes
294
# Copy of second parent 'different' range
295
b'\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
297
self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
298
self.assertEqual(sum(map(len, expected_lines)), end_point)
301
class TestGroupCompressBlock(tests.TestCase):
303
def make_block(self, key_to_text):
304
"""Create a GroupCompressBlock, filling it with the given texts."""
305
compressor = groupcompress.GroupCompressor()
307
for key in sorted(key_to_text):
308
compressor.compress(key, key_to_text[key], None)
309
locs = dict((key, (start, end)) for key, (start, _, end, _)
310
in compressor.labels_deltas.items())
311
block = compressor.flush()
312
raw_bytes = block.to_bytes()
313
# Go through from_bytes(to_bytes()) so that we start with a compressed
315
return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
317
def test_from_empty_bytes(self):
318
self.assertRaises(ValueError,
319
groupcompress.GroupCompressBlock.from_bytes, b'')
321
def test_from_minimal_bytes(self):
322
block = groupcompress.GroupCompressBlock.from_bytes(
324
self.assertIsInstance(block, groupcompress.GroupCompressBlock)
325
self.assertIs(None, block._content)
326
self.assertEqual(b'', block._z_content)
327
block._ensure_content()
328
self.assertEqual(b'', block._content)
329
self.assertEqual(b'', block._z_content)
330
block._ensure_content() # Ensure content is safe to call 2x
332
def test_from_invalid(self):
333
self.assertRaises(ValueError,
334
groupcompress.GroupCompressBlock.from_bytes,
335
b'this is not a valid header')
337
def test_from_bytes(self):
338
content = (b'a tiny bit of content\n')
339
z_content = zlib.compress(content)
341
b'gcb1z\n' # group compress block v1 plain
342
b'%d\n' # Length of compressed content
343
b'%d\n' # Length of uncompressed content
344
b'%s' # Compressed content
345
) % (len(z_content), len(content), z_content)
346
block = groupcompress.GroupCompressBlock.from_bytes(
348
self.assertEqual(z_content, block._z_content)
349
self.assertIs(None, block._content)
350
self.assertEqual(len(z_content), block._z_content_length)
351
self.assertEqual(len(content), block._content_length)
352
block._ensure_content()
353
self.assertEqual(z_content, block._z_content)
354
self.assertEqual(content, block._content)
356
def test_to_chunks(self):
357
content_chunks = [b'this is some content\n',
358
b'this content will be compressed\n']
359
content_len = sum(map(len, content_chunks))
360
content = b''.join(content_chunks)
361
gcb = groupcompress.GroupCompressBlock()
362
gcb.set_chunked_content(content_chunks, content_len)
363
total_len, block_chunks = gcb.to_chunks()
364
block_bytes = b''.join(block_chunks)
365
self.assertEqual(gcb._z_content_length, len(gcb._z_content))
366
self.assertEqual(total_len, len(block_bytes))
367
self.assertEqual(gcb._content_length, content_len)
368
expected_header = (b'gcb1z\n' # group compress block v1 zlib
369
b'%d\n' # Length of compressed content
370
b'%d\n' # Length of uncompressed content
371
) % (gcb._z_content_length, gcb._content_length)
372
# The first chunk should be the header chunk. It is small, fixed size,
373
# and there is no compelling reason to split it up
374
self.assertEqual(expected_header, block_chunks[0])
375
self.assertStartsWith(block_bytes, expected_header)
376
remaining_bytes = block_bytes[len(expected_header):]
377
raw_bytes = zlib.decompress(remaining_bytes)
378
self.assertEqual(content, raw_bytes)
380
def test_to_bytes(self):
381
content = (b'this is some content\n'
382
b'this content will be compressed\n')
383
gcb = groupcompress.GroupCompressBlock()
384
gcb.set_content(content)
385
data = gcb.to_bytes()
386
self.assertEqual(gcb._z_content_length, len(gcb._z_content))
387
self.assertEqual(gcb._content_length, len(content))
388
expected_header = (b'gcb1z\n' # group compress block v1 zlib
389
b'%d\n' # Length of compressed content
390
b'%d\n' # Length of uncompressed content
391
) % (gcb._z_content_length, gcb._content_length)
392
self.assertStartsWith(data, expected_header)
393
remaining_bytes = data[len(expected_header):]
394
raw_bytes = zlib.decompress(remaining_bytes)
395
self.assertEqual(content, raw_bytes)
397
# we should get the same results if using the chunked version
398
gcb = groupcompress.GroupCompressBlock()
399
gcb.set_chunked_content([b'this is some content\n'
400
b'this content will be compressed\n'],
403
data = gcb.to_bytes()
404
self.assertEqual(old_data, data)
406
def test_partial_decomp(self):
408
# We need a sufficient amount of data so that zlib.decompress has
409
# partial decompression to work with. Most auto-generated data
410
# compresses a bit too well, we want a combination, so we combine a sha
411
# hash with compressible data.
412
for i in range(2048):
413
next_content = b'%d\nThis is a bit of duplicate text\n' % (i,)
414
content_chunks.append(next_content)
415
next_sha1 = osutils.sha_string(next_content)
416
content_chunks.append(next_sha1 + b'\n')
417
content = b''.join(content_chunks)
418
self.assertEqual(158634, len(content))
419
z_content = zlib.compress(content)
420
self.assertEqual(57182, len(z_content))
421
block = groupcompress.GroupCompressBlock()
422
block._z_content_chunks = (z_content,)
423
block._z_content_length = len(z_content)
424
block._compressor_name = 'zlib'
425
block._content_length = 158634
426
self.assertIs(None, block._content)
427
block._ensure_content(100)
428
self.assertIsNot(None, block._content)
429
# We have decompressed at least 100 bytes
430
self.assertTrue(len(block._content) >= 100)
431
# We have not decompressed the whole content
432
self.assertTrue(len(block._content) < 158634)
433
self.assertEqualDiff(content[:len(block._content)], block._content)
434
# ensuring content that we already have shouldn't cause any more data
436
cur_len = len(block._content)
437
block._ensure_content(cur_len - 10)
438
self.assertEqual(cur_len, len(block._content))
439
# Now we want a bit more content
441
block._ensure_content(cur_len)
442
self.assertTrue(len(block._content) >= cur_len)
443
self.assertTrue(len(block._content) < 158634)
444
self.assertEqualDiff(content[:len(block._content)], block._content)
445
# And now lets finish
446
block._ensure_content(158634)
447
self.assertEqualDiff(content, block._content)
448
# And the decompressor is finalized
449
self.assertIs(None, block._z_content_decompressor)
451
def test__ensure_all_content(self):
453
# We need a sufficient amount of data so that zlib.decompress has
454
# partial decompression to work with. Most auto-generated data
455
# compresses a bit too well, we want a combination, so we combine a sha
456
# hash with compressible data.
457
for i in range(2048):
458
next_content = b'%d\nThis is a bit of duplicate text\n' % (i,)
459
content_chunks.append(next_content)
460
next_sha1 = osutils.sha_string(next_content)
461
content_chunks.append(next_sha1 + b'\n')
462
content = b''.join(content_chunks)
463
self.assertEqual(158634, len(content))
464
z_content = zlib.compress(content)
465
self.assertEqual(57182, len(z_content))
466
block = groupcompress.GroupCompressBlock()
467
block._z_content_chunks = (z_content,)
468
block._z_content_length = len(z_content)
469
block._compressor_name = 'zlib'
470
block._content_length = 158634
471
self.assertIs(None, block._content)
472
# The first _ensure_content got all of the required data
473
block._ensure_content(158634)
474
self.assertEqualDiff(content, block._content)
475
# And we should have released the _z_content_decompressor since it was
477
self.assertIs(None, block._z_content_decompressor)
479
def test__dump(self):
480
dup_content = b'some duplicate content\nwhich is sufficiently long\n'
481
key_to_text = {(b'1',): dup_content + b'1 unique\n',
482
(b'2',): dup_content + b'2 extra special\n'}
483
locs, block = self.make_block(key_to_text)
484
self.assertEqual([(b'f', len(key_to_text[(b'1',)])),
485
(b'd', 21, len(key_to_text[(b'2',)]),
486
[(b'c', 2, len(dup_content)),
487
(b'i', len(b'2 extra special\n'), b'')
492
class TestCaseWithGroupCompressVersionedFiles(
493
tests.TestCaseWithMemoryTransport):
495
def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
496
dir='.', inconsistency_fatal=True):
497
t = self.get_transport(dir)
499
vf = groupcompress.make_pack_factory(graph=create_graph,
500
delta=False, keylength=keylength,
501
inconsistency_fatal=inconsistency_fatal)(t)
503
self.addCleanup(groupcompress.cleanup_pack_group, vf)
507
class TestGroupCompressVersionedFiles(TestCaseWithGroupCompressVersionedFiles):
509
def make_g_index(self, name, ref_lists=0, nodes=[]):
510
builder = btree_index.BTreeBuilder(ref_lists)
511
for node, references, value in nodes:
512
builder.add_node(node, references, value)
513
stream = builder.finish()
514
trans = self.get_transport()
515
size = trans.put_file(name, stream)
516
return btree_index.BTreeGraphIndex(trans, name, size)
518
def make_g_index_missing_parent(self):
519
graph_index = self.make_g_index('missing_parent', 1,
520
[((b'parent', ), b'2 78 2 10', ([],)),
521
((b'tip', ), b'2 78 2 10',
522
([(b'parent', ), (b'missing-parent', )],)),
526
def test_get_record_stream_as_requested(self):
527
# Consider promoting 'as-requested' to general availability, and
528
# make this a VF interface test
529
vf = self.make_test_vf(False, dir='source')
530
vf.add_lines((b'a',), (), [b'lines\n'])
531
vf.add_lines((b'b',), (), [b'lines\n'])
532
vf.add_lines((b'c',), (), [b'lines\n'])
533
vf.add_lines((b'd',), (), [b'lines\n'])
535
keys = [record.key for record in vf.get_record_stream(
536
[(b'a',), (b'b',), (b'c',), (b'd',)],
537
'as-requested', False)]
538
self.assertEqual([(b'a',), (b'b',), (b'c',), (b'd',)], keys)
539
keys = [record.key for record in vf.get_record_stream(
540
[(b'b',), (b'a',), (b'd',), (b'c',)],
541
'as-requested', False)]
542
self.assertEqual([(b'b',), (b'a',), (b'd',), (b'c',)], keys)
544
# It should work even after being repacked into another VF
545
vf2 = self.make_test_vf(False, dir='target')
546
vf2.insert_record_stream(vf.get_record_stream(
547
[(b'b',), (b'a',), (b'd',), (b'c',)], 'as-requested', False))
550
keys = [record.key for record in vf2.get_record_stream(
551
[(b'a',), (b'b',), (b'c',), (b'd',)],
552
'as-requested', False)]
553
self.assertEqual([(b'a',), (b'b',), (b'c',), (b'd',)], keys)
554
keys = [record.key for record in vf2.get_record_stream(
555
[(b'b',), (b'a',), (b'd',), (b'c',)],
556
'as-requested', False)]
557
self.assertEqual([(b'b',), (b'a',), (b'd',), (b'c',)], keys)
559
def test_get_record_stream_max_bytes_to_index_default(self):
560
vf = self.make_test_vf(True, dir='source')
561
vf.add_lines((b'a',), (), [b'lines\n'])
563
record = next(vf.get_record_stream([(b'a',)], 'unordered', True))
564
self.assertEqual(vf._DEFAULT_COMPRESSOR_SETTINGS,
565
record._manager._get_compressor_settings())
567
def test_get_record_stream_accesses_compressor_settings(self):
568
vf = self.make_test_vf(True, dir='source')
569
vf.add_lines((b'a',), (), [b'lines\n'])
571
vf._max_bytes_to_index = 1234
572
record = next(vf.get_record_stream([(b'a',)], 'unordered', True))
573
self.assertEqual(dict(max_bytes_to_index=1234),
574
record._manager._get_compressor_settings())
577
def grouped_stream(revision_ids, first_parents=()):
578
parents = first_parents
579
for revision_id in revision_ids:
581
record = versionedfile.FulltextContentFactory(
583
b'some content that is\n'
584
b'identical except for\n'
585
b'revision_id:%s\n' % (revision_id,))
589
def test_insert_record_stream_reuses_blocks(self):
590
vf = self.make_test_vf(True, dir='source')
592
vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
594
vf.insert_record_stream(self.grouped_stream(
595
[b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
597
stream = vf.get_record_stream(
598
[(r.encode(),) for r in 'abcdefgh'], 'unordered', False)
600
for record in stream:
601
if record.key in [(b'a',), (b'e',)]:
602
self.assertEqual('groupcompress-block', record.storage_kind)
604
self.assertEqual('groupcompress-block-ref',
606
block_bytes[record.key] = record._manager._block._z_content
608
self.assertEqual(8, num_records)
611
self.assertIs(block_bytes[key], block_bytes[(b'a',)])
612
self.assertNotEqual(block_bytes[key], block_bytes[(b'e',)])
615
self.assertIs(block_bytes[key], block_bytes[(b'e',)])
616
self.assertNotEqual(block_bytes[key], block_bytes[(b'a',)])
617
# Now copy the blocks into another vf, and ensure that the blocks are
618
# preserved without creating new entries
619
vf2 = self.make_test_vf(True, dir='target')
620
keys = [(r.encode(),) for r in 'abcdefgh']
621
# ordering in 'groupcompress' order, should actually swap the groups in
622
# the target vf, but the groups themselves should not be disturbed.
624
def small_size_stream():
625
for record in vf.get_record_stream(keys, 'groupcompress', False):
626
record._manager._full_enough_block_size = \
627
record._manager._block._content_length
630
vf2.insert_record_stream(small_size_stream())
631
stream = vf2.get_record_stream(keys, 'groupcompress', False)
634
for record in stream:
636
self.assertEqual(block_bytes[record.key],
637
record._manager._block._z_content)
638
self.assertEqual(8, num_records)
640
def test_insert_record_stream_packs_on_the_fly(self):
641
vf = self.make_test_vf(True, dir='source')
643
vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
645
vf.insert_record_stream(self.grouped_stream(
646
[b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
647
# Now copy the blocks into another vf, and see that the
648
# insert_record_stream rebuilt a new block on-the-fly because of
650
vf2 = self.make_test_vf(True, dir='target')
651
keys = [(r.encode(),) for r in 'abcdefgh']
652
vf2.insert_record_stream(vf.get_record_stream(
653
keys, 'groupcompress', False))
654
stream = vf2.get_record_stream(keys, 'groupcompress', False)
657
# All of the records should be recombined into a single block
659
for record in stream:
662
block = record._manager._block
664
self.assertIs(block, record._manager._block)
665
self.assertEqual(8, num_records)
667
def test__insert_record_stream_no_reuse_block(self):
668
vf = self.make_test_vf(True, dir='source')
670
vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
672
vf.insert_record_stream(self.grouped_stream(
673
[b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
675
keys = [(r.encode(),) for r in 'abcdefgh']
676
self.assertEqual(8, len(list(
677
vf.get_record_stream(keys, 'unordered', False))))
678
# Now copy the blocks into another vf, and ensure that the blocks are
679
# preserved without creating new entries
680
vf2 = self.make_test_vf(True, dir='target')
681
# ordering in 'groupcompress' order, should actually swap the groups in
682
# the target vf, but the groups themselves should not be disturbed.
683
list(vf2._insert_record_stream(vf.get_record_stream(
684
keys, 'groupcompress', False),
687
# After inserting with reuse_blocks=False, we should have everything in
688
# a single new block.
689
stream = vf2.get_record_stream(keys, 'groupcompress', False)
691
for record in stream:
693
block = record._manager._block
695
self.assertIs(block, record._manager._block)
697
def test_add_missing_noncompression_parent_unvalidated_index(self):
698
unvalidated = self.make_g_index_missing_parent()
699
combined = _mod_index.CombinedGraphIndex([unvalidated])
700
index = groupcompress._GCGraphIndex(combined,
701
is_locked=lambda: True, parents=True,
702
track_external_parent_refs=True)
703
index.scan_unvalidated_index(unvalidated)
705
frozenset([(b'missing-parent',)]), index.get_missing_parents())
707
def test_track_external_parent_refs(self):
708
g_index = self.make_g_index('empty', 1, [])
709
mod_index = btree_index.BTreeBuilder(1, 1)
710
combined = _mod_index.CombinedGraphIndex([g_index, mod_index])
711
index = groupcompress._GCGraphIndex(combined,
712
is_locked=lambda: True, parents=True,
713
add_callback=mod_index.add_nodes,
714
track_external_parent_refs=True)
716
((b'new-key',), b'2 10 2 10', [((b'parent-1',), (b'parent-2',))])])
718
frozenset([(b'parent-1',), (b'parent-2',)]),
719
index.get_missing_parents())
721
def make_source_with_b(self, a_parent, path):
722
source = self.make_test_vf(True, dir=path)
723
source.add_lines((b'a',), (), [b'lines\n'])
725
b_parents = ((b'a',),)
728
source.add_lines((b'b',), b_parents, [b'lines\n'])
731
def do_inconsistent_inserts(self, inconsistency_fatal):
732
target = self.make_test_vf(True, dir='target',
733
inconsistency_fatal=inconsistency_fatal)
735
source = self.make_source_with_b(x == 1, 'source%s' % x)
736
target.insert_record_stream(source.get_record_stream(
737
[(b'b',)], 'unordered', False))
739
def test_inconsistent_redundant_inserts_warn(self):
740
"""Should not insert a record that is already present."""
743
def warning(template, args):
744
warnings.append(template % args)
745
_trace_warning = trace.warning
746
trace.warning = warning
748
self.do_inconsistent_inserts(inconsistency_fatal=False)
750
trace.warning = _trace_warning
751
self.assertContainsRe(
753
r"^inconsistent details in skipped record: \(b?'b',\)"
754
r" \(b?'42 32 0 8', \(\(\),\)\)"
755
r" \(b?'74 32 0 8', \(\(\(b?'a',\),\),\)\)$")
757
def test_inconsistent_redundant_inserts_raises(self):
758
e = self.assertRaises(knit.KnitCorrupt, self.do_inconsistent_inserts,
759
inconsistency_fatal=True)
760
self.assertContainsRe(str(e), r"Knit.* corrupt: inconsistent details"
762
r" \(b?'b',\) \(b?'42 32 0 8', \(\(\),\)\)"
763
r" \(b?'74 32 0 8', \(\(\(b?'a',\),\),\)\)")
765
def test_clear_cache(self):
766
vf = self.make_source_with_b(True, 'source')
768
for record in vf.get_record_stream([(b'a',), (b'b',)], 'unordered',
771
self.assertTrue(len(vf._group_cache) > 0)
773
self.assertEqual(0, len(vf._group_cache))
776
class TestGroupCompressConfig(tests.TestCaseWithTransport):
778
def make_test_vf(self):
779
t = self.get_transport('.')
781
factory = groupcompress.make_pack_factory(graph=True,
782
delta=False, keylength=1, inconsistency_fatal=True)
784
self.addCleanup(groupcompress.cleanup_pack_group, vf)
787
def test_max_bytes_to_index_default(self):
788
vf = self.make_test_vf()
789
gc = vf._make_group_compressor()
790
self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
791
vf._max_bytes_to_index)
792
if isinstance(gc, groupcompress.PyrexGroupCompressor):
793
self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
794
gc._delta_index._max_bytes_to_index)
796
def test_max_bytes_to_index_in_config(self):
797
c = config.GlobalConfig()
798
c.set_user_option('bzr.groupcompress.max_bytes_to_index', '10000')
799
vf = self.make_test_vf()
800
gc = vf._make_group_compressor()
801
self.assertEqual(10000, vf._max_bytes_to_index)
802
if isinstance(gc, groupcompress.PyrexGroupCompressor):
803
self.assertEqual(10000, gc._delta_index._max_bytes_to_index)
805
def test_max_bytes_to_index_bad_config(self):
806
c = config.GlobalConfig()
807
c.set_user_option('bzr.groupcompress.max_bytes_to_index', 'boogah')
808
vf = self.make_test_vf()
809
# TODO: This is triggering a warning, we might want to trap and make
810
# sure it is readable.
811
gc = vf._make_group_compressor()
812
self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
813
vf._max_bytes_to_index)
814
if isinstance(gc, groupcompress.PyrexGroupCompressor):
815
self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
816
gc._delta_index._max_bytes_to_index)
819
class StubGCVF(object):
820
def __init__(self, canned_get_blocks=None):
821
self._group_cache = {}
822
self._canned_get_blocks = canned_get_blocks or []
824
def _get_blocks(self, read_memos):
825
return iter(self._canned_get_blocks)
828
class Test_BatchingBlockFetcher(TestCaseWithGroupCompressVersionedFiles):
829
"""Simple whitebox unit tests for _BatchingBlockFetcher."""
831
def test_add_key_new_read_memo(self):
832
"""Adding a key with an uncached read_memo new to this batch adds that
833
read_memo to the list of memos to fetch.
835
# locations are: index_memo, ignored, parents, ignored
836
# where index_memo is: (idx, offset, len, factory_start, factory_end)
837
# and (idx, offset, size) is known as the 'read_memo', identifying the
839
read_memo = ('fake index', 100, 50)
841
('key',): (read_memo + (None, None), None, None, None)}
842
batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
843
total_size = batcher.add_key(('key',))
844
self.assertEqual(50, total_size)
845
self.assertEqual([('key',)], batcher.keys)
846
self.assertEqual([read_memo], batcher.memos_to_get)
848
def test_add_key_duplicate_read_memo(self):
849
"""read_memos that occur multiple times in a batch will only be fetched
852
read_memo = ('fake index', 100, 50)
853
# Two keys, both sharing the same read memo (but different overall
856
('key1',): (read_memo + (0, 1), None, None, None),
857
('key2',): (read_memo + (1, 2), None, None, None)}
858
batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
859
total_size = batcher.add_key(('key1',))
860
total_size = batcher.add_key(('key2',))
861
self.assertEqual(50, total_size)
862
self.assertEqual([('key1',), ('key2',)], batcher.keys)
863
self.assertEqual([read_memo], batcher.memos_to_get)
865
def test_add_key_cached_read_memo(self):
866
"""Adding a key with a cached read_memo will not cause that read_memo
867
to be added to the list to fetch.
869
read_memo = ('fake index', 100, 50)
871
gcvf._group_cache[read_memo] = 'fake block'
873
('key',): (read_memo + (None, None), None, None, None)}
874
batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
875
total_size = batcher.add_key(('key',))
876
self.assertEqual(0, total_size)
877
self.assertEqual([('key',)], batcher.keys)
878
self.assertEqual([], batcher.memos_to_get)
880
def test_yield_factories_empty(self):
881
"""An empty batch yields no factories."""
882
batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), {})
883
self.assertEqual([], list(batcher.yield_factories()))
885
def test_yield_factories_calls_get_blocks(self):
886
"""Uncached memos are retrieved via get_blocks."""
887
read_memo1 = ('fake index', 100, 50)
888
read_memo2 = ('fake index', 150, 40)
891
(read_memo1, groupcompress.GroupCompressBlock()),
892
(read_memo2, groupcompress.GroupCompressBlock())])
894
('key1',): (read_memo1 + (0, 0), None, None, None),
895
('key2',): (read_memo2 + (0, 0), None, None, None)}
896
batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
897
batcher.add_key(('key1',))
898
batcher.add_key(('key2',))
899
factories = list(batcher.yield_factories(full_flush=True))
900
self.assertLength(2, factories)
901
keys = [f.key for f in factories]
902
kinds = [f.storage_kind for f in factories]
903
self.assertEqual([('key1',), ('key2',)], keys)
904
self.assertEqual(['groupcompress-block', 'groupcompress-block'], kinds)
906
def test_yield_factories_flushing(self):
907
"""yield_factories holds back on yielding results from the final block
908
unless passed full_flush=True.
910
fake_block = groupcompress.GroupCompressBlock()
911
read_memo = ('fake index', 100, 50)
913
gcvf._group_cache[read_memo] = fake_block
915
('key',): (read_memo + (0, 0), None, None, None)}
916
batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
917
batcher.add_key(('key',))
918
self.assertEqual([], list(batcher.yield_factories()))
919
factories = list(batcher.yield_factories(full_flush=True))
920
self.assertLength(1, factories)
921
self.assertEqual(('key',), factories[0].key)
922
self.assertEqual('groupcompress-block', factories[0].storage_kind)
925
class TestLazyGroupCompress(tests.TestCaseWithTransport):
928
(b'key1',): b"this is a text\n"
929
b"with a reasonable amount of compressible bytes\n"
930
b"which can be shared between various other texts\n",
931
(b'key2',): b"another text\n"
932
b"with a reasonable amount of compressible bytes\n"
933
b"which can be shared between various other texts\n",
934
(b'key3',): b"yet another text which won't be extracted\n"
935
b"with a reasonable amount of compressible bytes\n"
936
b"which can be shared between various other texts\n",
937
(b'key4',): b"this will be extracted\n"
938
b"but references most of its bytes from\n"
939
b"yet another text which won't be extracted\n"
940
b"with a reasonable amount of compressible bytes\n"
941
b"which can be shared between various other texts\n",
944
def make_block(self, key_to_text):
945
"""Create a GroupCompressBlock, filling it with the given texts."""
946
compressor = groupcompress.GroupCompressor()
948
for key in sorted(key_to_text):
949
compressor.compress(key, key_to_text[key], None)
950
locs = dict((key, (start, end)) for key, (start, _, end, _)
951
in compressor.labels_deltas.items())
952
block = compressor.flush()
953
raw_bytes = block.to_bytes()
954
return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
956
def add_key_to_manager(self, key, locations, block, manager):
957
start, end = locations[key]
958
manager.add_factory(key, (), start, end)
960
def make_block_and_full_manager(self, texts):
961
locations, block = self.make_block(texts)
962
manager = groupcompress._LazyGroupContentManager(block)
963
for key in sorted(texts):
964
self.add_key_to_manager(key, locations, block, manager)
965
return block, manager
967
def test_get_fulltexts(self):
968
locations, block = self.make_block(self._texts)
969
manager = groupcompress._LazyGroupContentManager(block)
970
self.add_key_to_manager((b'key1',), locations, block, manager)
971
self.add_key_to_manager((b'key2',), locations, block, manager)
973
for record in manager.get_record_stream():
974
result_order.append(record.key)
975
text = self._texts[record.key]
976
self.assertEqual(text, record.get_bytes_as('fulltext'))
977
self.assertEqual([(b'key1',), (b'key2',)], result_order)
979
# If we build the manager in the opposite order, we should get them
980
# back in the opposite order
981
manager = groupcompress._LazyGroupContentManager(block)
982
self.add_key_to_manager((b'key2',), locations, block, manager)
983
self.add_key_to_manager((b'key1',), locations, block, manager)
985
for record in manager.get_record_stream():
986
result_order.append(record.key)
987
text = self._texts[record.key]
988
self.assertEqual(text, record.get_bytes_as('fulltext'))
989
self.assertEqual([(b'key2',), (b'key1',)], result_order)
991
def test__wire_bytes_no_keys(self):
992
locations, block = self.make_block(self._texts)
993
manager = groupcompress._LazyGroupContentManager(block)
994
wire_bytes = manager._wire_bytes()
995
block_length = len(block.to_bytes())
996
# We should have triggered a strip, since we aren't using any content
997
stripped_block = manager._block.to_bytes()
998
self.assertTrue(block_length > len(stripped_block))
999
empty_z_header = zlib.compress(b'')
1000
self.assertEqual(b'groupcompress-block\n'
1001
b'8\n' # len(compress(''))
1003
b'%d\n' # compressed block len
1006
% (len(stripped_block), empty_z_header,
1010
def test__wire_bytes(self):
1011
locations, block = self.make_block(self._texts)
1012
manager = groupcompress._LazyGroupContentManager(block)
1013
self.add_key_to_manager((b'key1',), locations, block, manager)
1014
self.add_key_to_manager((b'key4',), locations, block, manager)
1015
block_bytes = block.to_bytes()
1016
wire_bytes = manager._wire_bytes()
1017
(storage_kind, z_header_len, header_len,
1018
block_len, rest) = wire_bytes.split(b'\n', 4)
1019
z_header_len = int(z_header_len)
1020
header_len = int(header_len)
1021
block_len = int(block_len)
1022
self.assertEqual(b'groupcompress-block', storage_kind)
1023
self.assertEqual(34, z_header_len)
1024
self.assertEqual(26, header_len)
1025
self.assertEqual(len(block_bytes), block_len)
1026
z_header = rest[:z_header_len]
1027
header = zlib.decompress(z_header)
1028
self.assertEqual(header_len, len(header))
1029
entry1 = locations[(b'key1',)]
1030
entry4 = locations[(b'key4',)]
1031
self.assertEqualDiff(b'key1\n'
1033
b'%d\n' # start offset
1034
b'%d\n' # end offset
1039
% (entry1[0], entry1[1],
1040
entry4[0], entry4[1]),
1042
z_block = rest[z_header_len:]
1043
self.assertEqual(block_bytes, z_block)
1045
def test_from_bytes(self):
1046
locations, block = self.make_block(self._texts)
1047
manager = groupcompress._LazyGroupContentManager(block)
1048
self.add_key_to_manager((b'key1',), locations, block, manager)
1049
self.add_key_to_manager((b'key4',), locations, block, manager)
1050
wire_bytes = manager._wire_bytes()
1051
self.assertStartsWith(wire_bytes, b'groupcompress-block\n')
1052
manager = groupcompress._LazyGroupContentManager.from_bytes(wire_bytes)
1053
self.assertIsInstance(manager, groupcompress._LazyGroupContentManager)
1054
self.assertEqual(2, len(manager._factories))
1055
self.assertEqual(block._z_content, manager._block._z_content)
1057
for record in manager.get_record_stream():
1058
result_order.append(record.key)
1059
text = self._texts[record.key]
1060
self.assertEqual(text, record.get_bytes_as('fulltext'))
1061
self.assertEqual([(b'key1',), (b'key4',)], result_order)
1063
def test__check_rebuild_no_changes(self):
1064
block, manager = self.make_block_and_full_manager(self._texts)
1065
manager._check_rebuild_block()
1066
self.assertIs(block, manager._block)
1068
def test__check_rebuild_only_one(self):
1069
locations, block = self.make_block(self._texts)
1070
manager = groupcompress._LazyGroupContentManager(block)
1071
# Request just the first key, which should trigger a 'strip' action
1072
self.add_key_to_manager((b'key1',), locations, block, manager)
1073
manager._check_rebuild_block()
1074
self.assertIsNot(block, manager._block)
1075
self.assertTrue(block._content_length > manager._block._content_length)
1076
# We should be able to still get the content out of this block, though
1077
# it should only have 1 entry
1078
for record in manager.get_record_stream():
1079
self.assertEqual((b'key1',), record.key)
1080
self.assertEqual(self._texts[record.key],
1081
record.get_bytes_as('fulltext'))
1083
def test__check_rebuild_middle(self):
1084
locations, block = self.make_block(self._texts)
1085
manager = groupcompress._LazyGroupContentManager(block)
1086
# Request a small key in the middle should trigger a 'rebuild'
1087
self.add_key_to_manager((b'key4',), locations, block, manager)
1088
manager._check_rebuild_block()
1089
self.assertIsNot(block, manager._block)
1090
self.assertTrue(block._content_length > manager._block._content_length)
1091
for record in manager.get_record_stream():
1092
self.assertEqual((b'key4',), record.key)
1093
self.assertEqual(self._texts[record.key],
1094
record.get_bytes_as('fulltext'))
1096
def test_manager_default_compressor_settings(self):
1097
locations, old_block = self.make_block(self._texts)
1098
manager = groupcompress._LazyGroupContentManager(old_block)
1099
gcvf = groupcompress.GroupCompressVersionedFiles
1100
# It doesn't greedily evaluate _max_bytes_to_index
1101
self.assertIs(None, manager._compressor_settings)
1102
self.assertEqual(gcvf._DEFAULT_COMPRESSOR_SETTINGS,
1103
manager._get_compressor_settings())
1105
def test_manager_custom_compressor_settings(self):
1106
locations, old_block = self.make_block(self._texts)
1109
def compressor_settings():
1110
called.append('called')
1112
manager = groupcompress._LazyGroupContentManager(old_block,
1113
get_compressor_settings=compressor_settings)
1114
gcvf = groupcompress.GroupCompressVersionedFiles
1115
# It doesn't greedily evaluate compressor_settings
1116
self.assertIs(None, manager._compressor_settings)
1117
self.assertEqual((10,), manager._get_compressor_settings())
1118
self.assertEqual((10,), manager._get_compressor_settings())
1119
self.assertEqual((10,), manager._compressor_settings)
1120
# Only called 1 time
1121
self.assertEqual(['called'], called)
1123
def test__rebuild_handles_compressor_settings(self):
1124
if not isinstance(groupcompress.GroupCompressor,
1125
groupcompress.PyrexGroupCompressor):
1126
raise tests.TestNotApplicable('pure-python compressor'
1127
' does not handle compressor_settings')
1128
locations, old_block = self.make_block(self._texts)
1129
manager = groupcompress._LazyGroupContentManager(old_block,
1130
get_compressor_settings=lambda: dict(max_bytes_to_index=32))
1131
gc = manager._make_group_compressor()
1132
self.assertEqual(32, gc._delta_index._max_bytes_to_index)
1133
self.add_key_to_manager((b'key3',), locations, old_block, manager)
1134
self.add_key_to_manager((b'key4',), locations, old_block, manager)
1135
action, last_byte, total_bytes = manager._check_rebuild_action()
1136
self.assertEqual('rebuild', action)
1137
manager._rebuild_block()
1138
new_block = manager._block
1139
self.assertIsNot(old_block, new_block)
1140
# Because of the new max_bytes_to_index, we do a poor job of
1141
# rebuilding. This is a side-effect of the change, but at least it does
1142
# show the setting had an effect.
1143
self.assertTrue(old_block._content_length < new_block._content_length)
1145
def test_check_is_well_utilized_all_keys(self):
1146
block, manager = self.make_block_and_full_manager(self._texts)
1147
self.assertFalse(manager.check_is_well_utilized())
1148
# Though we can fake it by changing the recommended minimum size
1149
manager._full_enough_block_size = block._content_length
1150
self.assertTrue(manager.check_is_well_utilized())
1151
# Setting it just above causes it to fail
1152
manager._full_enough_block_size = block._content_length + 1
1153
self.assertFalse(manager.check_is_well_utilized())
1154
# Setting the mixed-block size doesn't do anything, because the content
1155
# is considered to not be 'mixed'
1156
manager._full_enough_mixed_block_size = block._content_length
1157
self.assertFalse(manager.check_is_well_utilized())
1159
def test_check_is_well_utilized_mixed_keys(self):
1161
f1k1 = (b'f1', b'k1')
1162
f1k2 = (b'f1', b'k2')
1163
f2k1 = (b'f2', b'k1')
1164
f2k2 = (b'f2', b'k2')
1165
texts[f1k1] = self._texts[(b'key1',)]
1166
texts[f1k2] = self._texts[(b'key2',)]
1167
texts[f2k1] = self._texts[(b'key3',)]
1168
texts[f2k2] = self._texts[(b'key4',)]
1169
block, manager = self.make_block_and_full_manager(texts)
1170
self.assertFalse(manager.check_is_well_utilized())
1171
manager._full_enough_block_size = block._content_length
1172
self.assertTrue(manager.check_is_well_utilized())
1173
manager._full_enough_block_size = block._content_length + 1
1174
self.assertFalse(manager.check_is_well_utilized())
1175
manager._full_enough_mixed_block_size = block._content_length
1176
self.assertTrue(manager.check_is_well_utilized())
1178
def test_check_is_well_utilized_partial_use(self):
1179
locations, block = self.make_block(self._texts)
1180
manager = groupcompress._LazyGroupContentManager(block)
1181
manager._full_enough_block_size = block._content_length
1182
self.add_key_to_manager((b'key1',), locations, block, manager)
1183
self.add_key_to_manager((b'key2',), locations, block, manager)
1184
# Just using the content from key1 and 2 is not enough to be considered
1186
self.assertFalse(manager.check_is_well_utilized())
1187
# However if we add key3, then we have enough, as we only require 75%
1189
self.add_key_to_manager((b'key4',), locations, block, manager)
1190
self.assertTrue(manager.check_is_well_utilized())
1193
class Test_GCBuildDetails(tests.TestCase):
1195
def test_acts_like_tuple(self):
1196
# _GCBuildDetails inlines some of the data that used to be spread out
1197
# across a bunch of tuples
1198
bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1199
('INDEX', 10, 20, 0, 5))
1200
self.assertEqual(4, len(bd))
1201
self.assertEqual(('INDEX', 10, 20, 0, 5), bd[0])
1202
self.assertEqual(None, bd[1]) # Compression Parent is always None
1203
self.assertEqual((('parent1',), ('parent2',)), bd[2])
1204
self.assertEqual(('group', None), bd[3]) # Record details
1206
def test__repr__(self):
1207
bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1208
('INDEX', 10, 20, 0, 5))
1209
self.assertEqual("_GCBuildDetails(('INDEX', 10, 20, 0, 5),"
1210
" (('parent1',), ('parent2',)))",