1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for group compression."""
35
from ..osutils import sha_string
36
from .test__groupcompress import compiled_groupcompress_feature
37
from .scenarios import load_tests_apply_scenarios
40
def group_compress_implementation_scenarios():
42
('python', {'compressor': groupcompress.PythonGroupCompressor}),
44
if compiled_groupcompress_feature.available():
45
scenarios.append(('C',
46
{'compressor': groupcompress.PyrexGroupCompressor}))
50
load_tests = load_tests_apply_scenarios
53
class TestGroupCompressor(tests.TestCase):
55
def _chunks_to_repr_lines(self, chunks):
56
return '\n'.join(map(repr, b''.join(chunks).split(b'\n')))
58
def assertEqualDiffEncoded(self, expected, actual):
59
"""Compare the actual content to the expected content.
61
:param expected: A group of chunks that we expect to see
62
:param actual: The measured 'chunks'
64
We will transform the chunks back into lines, and then run 'repr()'
65
over them to handle non-ascii characters.
67
self.assertEqualDiff(self._chunks_to_repr_lines(expected),
68
self._chunks_to_repr_lines(actual))
71
class TestAllGroupCompressors(TestGroupCompressor):
72
"""Tests for GroupCompressor"""
74
scenarios = group_compress_implementation_scenarios()
75
compressor = None # Set by scenario
77
def test_empty_delta(self):
78
compressor = self.compressor()
79
self.assertEqual([], compressor.chunks)
81
def test_one_nosha_delta(self):
83
compressor = self.compressor()
84
sha1, start_point, end_point, _ = compressor.compress(('label',),
85
b'strange\ncommon\n', None)
86
self.assertEqual(sha_string(b'strange\ncommon\n'), sha1)
87
expected_lines = b'f\x0fstrange\ncommon\n'
88
self.assertEqual(expected_lines, b''.join(compressor.chunks))
89
self.assertEqual(0, start_point)
90
self.assertEqual(len(expected_lines), end_point)
92
def test_empty_content(self):
93
compressor = self.compressor()
94
# Adding empty bytes should return the 'null' record
95
sha1, start_point, end_point, kind = compressor.compress(('empty',),
97
self.assertEqual(0, start_point)
98
self.assertEqual(0, end_point)
99
self.assertEqual('fulltext', kind)
100
self.assertEqual(groupcompress._null_sha1, sha1)
101
self.assertEqual(0, compressor.endpoint)
102
self.assertEqual([], compressor.chunks)
103
# Even after adding some content
104
compressor.compress(('content',), b'some\nbytes\n', None)
105
self.assertTrue(compressor.endpoint > 0)
106
sha1, start_point, end_point, kind = compressor.compress(('empty2',),
108
self.assertEqual(0, start_point)
109
self.assertEqual(0, end_point)
110
self.assertEqual('fulltext', kind)
111
self.assertEqual(groupcompress._null_sha1, sha1)
113
def test_extract_from_compressor(self):
114
# Knit fetching will try to reconstruct texts locally which results in
115
# reading something that is in the compressor stream already.
116
compressor = self.compressor()
117
sha1_1, _, _, _ = compressor.compress(('label',),
118
b'strange\ncommon long line\nthat needs a 16 byte match\n', None)
119
expected_lines = list(compressor.chunks)
120
sha1_2, _, end_point, _ = compressor.compress(('newlabel',),
121
b'common long line\nthat needs a 16 byte match\ndifferent\n', None)
123
self.assertEqual((b'strange\ncommon long line\n'
124
b'that needs a 16 byte match\n', sha1_1),
125
compressor.extract(('label',)))
127
self.assertEqual((b'common long line\nthat needs a 16 byte match\n'
128
b'different\n', sha1_2),
129
compressor.extract(('newlabel',)))
131
def test_pop_last(self):
132
compressor = self.compressor()
133
_, _, _, _ = compressor.compress(('key1',),
134
b'some text\nfor the first entry\n', None)
135
expected_lines = list(compressor.chunks)
136
_, _, _, _ = compressor.compress(('key2',),
137
b'some text\nfor the second entry\n', None)
138
compressor.pop_last()
139
self.assertEqual(expected_lines, compressor.chunks)
142
class TestPyrexGroupCompressor(TestGroupCompressor):
144
_test_needs_features = [compiled_groupcompress_feature]
145
compressor = groupcompress.PyrexGroupCompressor
147
def test_stats(self):
148
compressor = self.compressor()
149
compressor.compress(('label',),
151
b'common very very long line\n'
152
b'plus more text\n', None)
153
compressor.compress(('newlabel',),
154
b'common very very long line\n'
157
b'moredifferent\n', None)
158
compressor.compress(('label3',),
160
b'common very very long line\n'
163
b'moredifferent\n', None)
164
self.assertAlmostEqual(1.9, compressor.ratio(), 1)
166
def test_two_nosha_delta(self):
167
compressor = self.compressor()
168
sha1_1, _, _, _ = compressor.compress(('label',),
169
b'strange\ncommon long line\nthat needs a 16 byte match\n', None)
170
expected_lines = list(compressor.chunks)
171
sha1_2, start_point, end_point, _ = compressor.compress(('newlabel',),
172
b'common long line\nthat needs a 16 byte match\ndifferent\n', None)
173
self.assertEqual(sha_string(b'common long line\n'
174
b'that needs a 16 byte match\n'
175
b'different\n'), sha1_2)
176
expected_lines.extend([
177
# 'delta', delta length
179
# source and target length
181
# copy the line common
182
b'\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
183
# add the line different, and the trailing newline
184
b'\x0adifferent\n', # insert 10 bytes
186
self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
187
self.assertEqual(sum(map(len, expected_lines)), end_point)
189
def test_three_nosha_delta(self):
190
# The first interesting test: make a change that should use lines from
192
compressor = self.compressor()
193
sha1_1, _, _, _ = compressor.compress(('label',),
194
b'strange\ncommon very very long line\nwith some extra text\n', None)
195
sha1_2, _, _, _ = compressor.compress(('newlabel',),
196
b'different\nmoredifferent\nand then some more\n', None)
197
expected_lines = list(compressor.chunks)
198
sha1_3, start_point, end_point, _ = compressor.compress(('label3',),
199
b'new\ncommon very very long line\nwith some extra text\n'
200
b'different\nmoredifferent\nand then some more\n',
203
sha_string(b'new\ncommon very very long line\nwith some extra text\n'
204
b'different\nmoredifferent\nand then some more\n'),
206
expected_lines.extend([
207
# 'delta', delta length
209
# source and target length
213
# Copy of first parent 'common' range
214
b'\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
215
# Copy of second parent 'different' range
216
b'\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
218
self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
219
self.assertEqual(sum(map(len, expected_lines)), end_point)
222
class TestPythonGroupCompressor(TestGroupCompressor):
224
compressor = groupcompress.PythonGroupCompressor
226
def test_stats(self):
227
compressor = self.compressor()
228
compressor.compress(('label',),
230
b'common very very long line\n'
231
b'plus more text\n', None)
232
compressor.compress(('newlabel',),
233
b'common very very long line\n'
236
b'moredifferent\n', None)
237
compressor.compress(('label3',),
239
b'common very very long line\n'
242
b'moredifferent\n', None)
243
self.assertAlmostEqual(1.9, compressor.ratio(), 1)
245
def test_two_nosha_delta(self):
246
compressor = self.compressor()
247
sha1_1, _, _, _ = compressor.compress(('label',),
248
b'strange\ncommon long line\nthat needs a 16 byte match\n', None)
249
expected_lines = list(compressor.chunks)
250
sha1_2, start_point, end_point, _ = compressor.compress(('newlabel',),
251
b'common long line\nthat needs a 16 byte match\ndifferent\n', None)
252
self.assertEqual(sha_string(b'common long line\n'
253
b'that needs a 16 byte match\n'
254
b'different\n'), sha1_2)
255
expected_lines.extend([
256
# 'delta', delta length
260
# copy the line common
261
b'\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
262
# add the line different, and the trailing newline
263
b'\x0adifferent\n', # insert 10 bytes
265
self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
266
self.assertEqual(sum(map(len, expected_lines)), end_point)
268
def test_three_nosha_delta(self):
269
# The first interesting test: make a change that should use lines from
271
compressor = self.compressor()
272
sha1_1, _, _, _ = compressor.compress(('label',),
273
b'strange\ncommon very very long line\nwith some extra text\n', None)
274
sha1_2, _, _, _ = compressor.compress(('newlabel',),
275
b'different\nmoredifferent\nand then some more\n', None)
276
expected_lines = list(compressor.chunks)
277
sha1_3, start_point, end_point, _ = compressor.compress(('label3',),
278
b'new\ncommon very very long line\nwith some extra text\n'
279
b'different\nmoredifferent\nand then some more\n',
282
sha_string(b'new\ncommon very very long line\nwith some extra text\n'
283
b'different\nmoredifferent\nand then some more\n'),
285
expected_lines.extend([
286
# 'delta', delta length
292
# Copy of first parent 'common' range
293
b'\x91\x0a\x30' # copy, offset 0x0a, 0x30 bytes
294
# Copy of second parent 'different' range
295
b'\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
297
self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
298
self.assertEqual(sum(map(len, expected_lines)), end_point)
301
class TestGroupCompressBlock(tests.TestCase):
303
def make_block(self, key_to_text):
304
"""Create a GroupCompressBlock, filling it with the given texts."""
305
compressor = groupcompress.GroupCompressor()
307
for key in sorted(key_to_text):
308
compressor.compress(key, key_to_text[key], None)
309
locs = dict((key, (start, end)) for key, (start, _, end, _)
310
in compressor.labels_deltas.items())
311
block = compressor.flush()
312
raw_bytes = block.to_bytes()
313
# Go through from_bytes(to_bytes()) so that we start with a compressed
315
return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
317
def test_from_empty_bytes(self):
318
self.assertRaises(ValueError,
319
groupcompress.GroupCompressBlock.from_bytes, b'')
321
def test_from_minimal_bytes(self):
322
block = groupcompress.GroupCompressBlock.from_bytes(
324
self.assertIsInstance(block, groupcompress.GroupCompressBlock)
325
self.assertIs(None, block._content)
326
self.assertEqual(b'', block._z_content)
327
block._ensure_content()
328
self.assertEqual(b'', block._content)
329
self.assertEqual(b'', block._z_content)
330
block._ensure_content() # Ensure content is safe to call 2x
332
def test_from_invalid(self):
333
self.assertRaises(ValueError,
334
groupcompress.GroupCompressBlock.from_bytes,
335
b'this is not a valid header')
337
def test_from_bytes(self):
338
content = (b'a tiny bit of content\n')
339
z_content = zlib.compress(content)
341
b'gcb1z\n' # group compress block v1 plain
342
b'%d\n' # Length of compressed content
343
b'%d\n' # Length of uncompressed content
344
b'%s' # Compressed content
345
) % (len(z_content), len(content), z_content)
346
block = groupcompress.GroupCompressBlock.from_bytes(
348
self.assertEqual(z_content, block._z_content)
349
self.assertIs(None, block._content)
350
self.assertEqual(len(z_content), block._z_content_length)
351
self.assertEqual(len(content), block._content_length)
352
block._ensure_content()
353
self.assertEqual(z_content, block._z_content)
354
self.assertEqual(content, block._content)
356
def test_to_chunks(self):
357
content_chunks = [b'this is some content\n',
358
b'this content will be compressed\n']
359
content_len = sum(map(len, content_chunks))
360
content = b''.join(content_chunks)
361
gcb = groupcompress.GroupCompressBlock()
362
gcb.set_chunked_content(content_chunks, content_len)
363
total_len, block_chunks = gcb.to_chunks()
364
block_bytes = b''.join(block_chunks)
365
self.assertEqual(gcb._z_content_length, len(gcb._z_content))
366
self.assertEqual(total_len, len(block_bytes))
367
self.assertEqual(gcb._content_length, content_len)
368
expected_header =(b'gcb1z\n' # group compress block v1 zlib
369
b'%d\n' # Length of compressed content
370
b'%d\n' # Length of uncompressed content
371
) % (gcb._z_content_length, gcb._content_length)
372
# The first chunk should be the header chunk. It is small, fixed size,
373
# and there is no compelling reason to split it up
374
self.assertEqual(expected_header, block_chunks[0])
375
self.assertStartsWith(block_bytes, expected_header)
376
remaining_bytes = block_bytes[len(expected_header):]
377
raw_bytes = zlib.decompress(remaining_bytes)
378
self.assertEqual(content, raw_bytes)
380
def test_to_bytes(self):
381
content = (b'this is some content\n'
382
b'this content will be compressed\n')
383
gcb = groupcompress.GroupCompressBlock()
384
gcb.set_content(content)
385
data = gcb.to_bytes()
386
self.assertEqual(gcb._z_content_length, len(gcb._z_content))
387
self.assertEqual(gcb._content_length, len(content))
388
expected_header =(b'gcb1z\n' # group compress block v1 zlib
389
b'%d\n' # Length of compressed content
390
b'%d\n' # Length of uncompressed content
391
) % (gcb._z_content_length, gcb._content_length)
392
self.assertStartsWith(data, expected_header)
393
remaining_bytes = data[len(expected_header):]
394
raw_bytes = zlib.decompress(remaining_bytes)
395
self.assertEqual(content, raw_bytes)
397
# we should get the same results if using the chunked version
398
gcb = groupcompress.GroupCompressBlock()
399
gcb.set_chunked_content([b'this is some content\n'
400
b'this content will be compressed\n'],
403
data = gcb.to_bytes()
404
self.assertEqual(old_data, data)
406
def test_partial_decomp(self):
408
# We need a sufficient amount of data so that zlib.decompress has
409
# partial decompression to work with. Most auto-generated data
410
# compresses a bit too well, we want a combination, so we combine a sha
411
# hash with compressible data.
412
for i in range(2048):
413
next_content = b'%d\nThis is a bit of duplicate text\n' % (i,)
414
content_chunks.append(next_content)
415
next_sha1 = osutils.sha_string(next_content)
416
content_chunks.append(next_sha1 + b'\n')
417
content = b''.join(content_chunks)
418
self.assertEqual(158634, len(content))
419
z_content = zlib.compress(content)
420
self.assertEqual(57182, len(z_content))
421
block = groupcompress.GroupCompressBlock()
422
block._z_content_chunks = (z_content,)
423
block._z_content_length = len(z_content)
424
block._compressor_name = 'zlib'
425
block._content_length = 158634
426
self.assertIs(None, block._content)
427
block._ensure_content(100)
428
self.assertIsNot(None, block._content)
429
# We have decompressed at least 100 bytes
430
self.assertTrue(len(block._content) >= 100)
431
# We have not decompressed the whole content
432
self.assertTrue(len(block._content) < 158634)
433
self.assertEqualDiff(content[:len(block._content)], block._content)
434
# ensuring content that we already have shouldn't cause any more data
436
cur_len = len(block._content)
437
block._ensure_content(cur_len - 10)
438
self.assertEqual(cur_len, len(block._content))
439
# Now we want a bit more content
441
block._ensure_content(cur_len)
442
self.assertTrue(len(block._content) >= cur_len)
443
self.assertTrue(len(block._content) < 158634)
444
self.assertEqualDiff(content[:len(block._content)], block._content)
445
# And now lets finish
446
block._ensure_content(158634)
447
self.assertEqualDiff(content, block._content)
448
# And the decompressor is finalized
449
self.assertIs(None, block._z_content_decompressor)
451
def test__ensure_all_content(self):
453
# We need a sufficient amount of data so that zlib.decompress has
454
# partial decompression to work with. Most auto-generated data
455
# compresses a bit too well, we want a combination, so we combine a sha
456
# hash with compressible data.
457
for i in range(2048):
458
next_content = b'%d\nThis is a bit of duplicate text\n' % (i,)
459
content_chunks.append(next_content)
460
next_sha1 = osutils.sha_string(next_content)
461
content_chunks.append(next_sha1 + b'\n')
462
content = b''.join(content_chunks)
463
self.assertEqual(158634, len(content))
464
z_content = zlib.compress(content)
465
self.assertEqual(57182, len(z_content))
466
block = groupcompress.GroupCompressBlock()
467
block._z_content_chunks = (z_content,)
468
block._z_content_length = len(z_content)
469
block._compressor_name = 'zlib'
470
block._content_length = 158634
471
self.assertIs(None, block._content)
472
# The first _ensure_content got all of the required data
473
block._ensure_content(158634)
474
self.assertEqualDiff(content, block._content)
475
# And we should have released the _z_content_decompressor since it was
477
self.assertIs(None, block._z_content_decompressor)
479
def test__dump(self):
480
dup_content = b'some duplicate content\nwhich is sufficiently long\n'
481
key_to_text = {(b'1',): dup_content + b'1 unique\n',
482
(b'2',): dup_content + b'2 extra special\n'}
483
locs, block = self.make_block(key_to_text)
484
self.assertEqual([(b'f', len(key_to_text[(b'1',)])),
485
(b'd', 21, len(key_to_text[(b'2',)]),
486
[(b'c', 2, len(dup_content)),
487
(b'i', len(b'2 extra special\n'), b'')
492
class TestCaseWithGroupCompressVersionedFiles(
493
tests.TestCaseWithMemoryTransport):
495
def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
496
dir='.', inconsistency_fatal=True):
497
t = self.get_transport(dir)
499
vf = groupcompress.make_pack_factory(graph=create_graph,
500
delta=False, keylength=keylength,
501
inconsistency_fatal=inconsistency_fatal)(t)
503
self.addCleanup(groupcompress.cleanup_pack_group, vf)
507
class TestGroupCompressVersionedFiles(TestCaseWithGroupCompressVersionedFiles):
509
def make_g_index(self, name, ref_lists=0, nodes=[]):
510
builder = btree_index.BTreeBuilder(ref_lists)
511
for node, references, value in nodes:
512
builder.add_node(node, references, value)
513
stream = builder.finish()
514
trans = self.get_transport()
515
size = trans.put_file(name, stream)
516
return btree_index.BTreeGraphIndex(trans, name, size)
518
def make_g_index_missing_parent(self):
519
graph_index = self.make_g_index(b'missing_parent', 1,
520
[((b'parent', ), b'2 78 2 10', ([],)),
521
((b'tip', ), b'2 78 2 10',
522
([(b'parent', ), (b'missing-parent', )],)),
526
def test_get_record_stream_as_requested(self):
527
# Consider promoting 'as-requested' to general availability, and
528
# make this a VF interface test
529
vf = self.make_test_vf(False, dir='source')
530
vf.add_lines((b'a',), (), [b'lines\n'])
531
vf.add_lines((b'b',), (), [b'lines\n'])
532
vf.add_lines((b'c',), (), [b'lines\n'])
533
vf.add_lines((b'd',), (), [b'lines\n'])
535
keys = [record.key for record in vf.get_record_stream(
536
[(b'a',), (b'b',), (b'c',), (b'd',)],
537
'as-requested', False)]
538
self.assertEqual([(b'a',), (b'b',), (b'c',), (b'd',)], keys)
539
keys = [record.key for record in vf.get_record_stream(
540
[(b'b',), (b'a',), (b'd',), (b'c',)],
541
'as-requested', False)]
542
self.assertEqual([(b'b',), (b'a',), (b'd',), (b'c',)], keys)
544
# It should work even after being repacked into another VF
545
vf2 = self.make_test_vf(False, dir='target')
546
vf2.insert_record_stream(vf.get_record_stream(
547
[(b'b',), (b'a',), (b'd',), (b'c',)], 'as-requested', False))
550
keys = [record.key for record in vf2.get_record_stream(
551
[(b'a',), (b'b',), (b'c',), (b'd',)],
552
'as-requested', False)]
553
self.assertEqual([(b'a',), (b'b',), (b'c',), (b'd',)], keys)
554
keys = [record.key for record in vf2.get_record_stream(
555
[(b'b',), (b'a',), (b'd',), (b'c',)],
556
'as-requested', False)]
557
self.assertEqual([(b'b',), (b'a',), (b'd',), (b'c',)], keys)
559
def test_get_record_stream_max_bytes_to_index_default(self):
560
vf = self.make_test_vf(True, dir='source')
561
vf.add_lines((b'a',), (), [b'lines\n'])
563
record = next(vf.get_record_stream([(b'a',)], 'unordered', True))
564
self.assertEqual(vf._DEFAULT_COMPRESSOR_SETTINGS,
565
record._manager._get_compressor_settings())
567
def test_get_record_stream_accesses_compressor_settings(self):
568
vf = self.make_test_vf(True, dir='source')
569
vf.add_lines((b'a',), (), [b'lines\n'])
571
vf._max_bytes_to_index = 1234
572
record = next(vf.get_record_stream([(b'a',)], 'unordered', True))
573
self.assertEqual(dict(max_bytes_to_index=1234),
574
record._manager._get_compressor_settings())
577
def grouped_stream(revision_ids, first_parents=()):
578
parents = first_parents
579
for revision_id in revision_ids:
581
record = versionedfile.FulltextContentFactory(
583
b'some content that is\n'
584
b'identical except for\n'
585
b'revision_id:%s\n' % (revision_id,))
589
def test_insert_record_stream_reuses_blocks(self):
590
vf = self.make_test_vf(True, dir='source')
592
vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
594
vf.insert_record_stream(self.grouped_stream(
595
[b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
597
stream = vf.get_record_stream(
598
[(r.encode(),) for r in 'abcdefgh'], 'unordered', False)
600
for record in stream:
601
if record.key in [(b'a',), (b'e',)]:
602
self.assertEqual('groupcompress-block', record.storage_kind)
604
self.assertEqual('groupcompress-block-ref',
606
block_bytes[record.key] = record._manager._block._z_content
608
self.assertEqual(8, num_records)
611
self.assertIs(block_bytes[key], block_bytes[(b'a',)])
612
self.assertNotEqual(block_bytes[key], block_bytes[(b'e',)])
615
self.assertIs(block_bytes[key], block_bytes[(b'e',)])
616
self.assertNotEqual(block_bytes[key], block_bytes[(b'a',)])
617
# Now copy the blocks into another vf, and ensure that the blocks are
618
# preserved without creating new entries
619
vf2 = self.make_test_vf(True, dir='target')
620
keys = [(r.encode(),) for r in 'abcdefgh']
621
# ordering in 'groupcompress' order, should actually swap the groups in
622
# the target vf, but the groups themselves should not be disturbed.
623
def small_size_stream():
624
for record in vf.get_record_stream(keys, 'groupcompress', False):
625
record._manager._full_enough_block_size = \
626
record._manager._block._content_length
629
vf2.insert_record_stream(small_size_stream())
630
stream = vf2.get_record_stream(keys, 'groupcompress', False)
633
for record in stream:
635
self.assertEqual(block_bytes[record.key],
636
record._manager._block._z_content)
637
self.assertEqual(8, num_records)
639
def test_insert_record_stream_packs_on_the_fly(self):
640
vf = self.make_test_vf(True, dir='source')
642
vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
644
vf.insert_record_stream(self.grouped_stream(
645
[b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
646
# Now copy the blocks into another vf, and see that the
647
# insert_record_stream rebuilt a new block on-the-fly because of
649
vf2 = self.make_test_vf(True, dir='target')
650
keys = [(r.encode(),) for r in 'abcdefgh']
651
vf2.insert_record_stream(vf.get_record_stream(
652
keys, 'groupcompress', False))
653
stream = vf2.get_record_stream(keys, 'groupcompress', False)
656
# All of the records should be recombined into a single block
658
for record in stream:
661
block = record._manager._block
663
self.assertIs(block, record._manager._block)
664
self.assertEqual(8, num_records)
666
def test__insert_record_stream_no_reuse_block(self):
667
vf = self.make_test_vf(True, dir='source')
669
vf.insert_record_stream(self.grouped_stream([b'a', b'b', b'c', b'd']))
671
vf.insert_record_stream(self.grouped_stream(
672
[b'e', b'f', b'g', b'h'], first_parents=((b'd',),)))
674
keys = [(r.encode(),) for r in 'abcdefgh']
675
self.assertEqual(8, len(list(
676
vf.get_record_stream(keys, 'unordered', False))))
677
# Now copy the blocks into another vf, and ensure that the blocks are
678
# preserved without creating new entries
679
vf2 = self.make_test_vf(True, dir='target')
680
# ordering in 'groupcompress' order, should actually swap the groups in
681
# the target vf, but the groups themselves should not be disturbed.
682
list(vf2._insert_record_stream(vf.get_record_stream(
683
keys, 'groupcompress', False),
686
# After inserting with reuse_blocks=False, we should have everything in
687
# a single new block.
688
stream = vf2.get_record_stream(keys, 'groupcompress', False)
690
for record in stream:
692
block = record._manager._block
694
self.assertIs(block, record._manager._block)
696
def test_add_missing_noncompression_parent_unvalidated_index(self):
697
unvalidated = self.make_g_index_missing_parent()
698
combined = _mod_index.CombinedGraphIndex([unvalidated])
699
index = groupcompress._GCGraphIndex(combined,
700
is_locked=lambda: True, parents=True,
701
track_external_parent_refs=True)
702
index.scan_unvalidated_index(unvalidated)
704
frozenset([(b'missing-parent',)]), index.get_missing_parents())
706
def test_track_external_parent_refs(self):
707
g_index = self.make_g_index('empty', 1, [])
708
mod_index = btree_index.BTreeBuilder(1, 1)
709
combined = _mod_index.CombinedGraphIndex([g_index, mod_index])
710
index = groupcompress._GCGraphIndex(combined,
711
is_locked=lambda: True, parents=True,
712
add_callback=mod_index.add_nodes,
713
track_external_parent_refs=True)
715
((b'new-key',), b'2 10 2 10', [((b'parent-1',), (b'parent-2',))])])
717
frozenset([(b'parent-1',), (b'parent-2',)]),
718
index.get_missing_parents())
720
def make_source_with_b(self, a_parent, path):
721
source = self.make_test_vf(True, dir=path)
722
source.add_lines((b'a',), (), [b'lines\n'])
724
b_parents = ((b'a',),)
727
source.add_lines((b'b',), b_parents, [b'lines\n'])
730
def do_inconsistent_inserts(self, inconsistency_fatal):
731
target = self.make_test_vf(True, dir='target',
732
inconsistency_fatal=inconsistency_fatal)
734
source = self.make_source_with_b(x==1, 'source%s' % x)
735
target.insert_record_stream(source.get_record_stream(
736
[(b'b',)], 'unordered', False))
738
def test_inconsistent_redundant_inserts_warn(self):
739
"""Should not insert a record that is already present."""
741
def warning(template, args):
742
warnings.append(template % args)
743
_trace_warning = trace.warning
744
trace.warning = warning
746
self.do_inconsistent_inserts(inconsistency_fatal=False)
748
trace.warning = _trace_warning
749
self.assertContainsRe(
751
r"^inconsistent details in skipped record: \(b?'b',\)"
752
r" \(b?'42 32 0 8', \(\(\),\)\)"
753
r" \(b?'74 32 0 8', \(\(\(b?'a',\),\),\)\)$")
755
def test_inconsistent_redundant_inserts_raises(self):
756
e = self.assertRaises(knit.KnitCorrupt, self.do_inconsistent_inserts,
757
inconsistency_fatal=True)
758
self.assertContainsRe(str(e), r"Knit.* corrupt: inconsistent details"
760
r" \(b?'b',\) \(b?'42 32 0 8', \(\(\),\)\)"
761
r" \(b?'74 32 0 8', \(\(\(b?'a',\),\),\)\)")
763
def test_clear_cache(self):
764
vf = self.make_source_with_b(True, 'source')
766
for record in vf.get_record_stream([(b'a',), (b'b',)], 'unordered',
769
self.assertTrue(len(vf._group_cache) > 0)
771
self.assertEqual(0, len(vf._group_cache))
774
class TestGroupCompressConfig(tests.TestCaseWithTransport):
776
def make_test_vf(self):
777
t = self.get_transport('.')
779
factory = groupcompress.make_pack_factory(graph=True,
780
delta=False, keylength=1, inconsistency_fatal=True)
782
self.addCleanup(groupcompress.cleanup_pack_group, vf)
785
def test_max_bytes_to_index_default(self):
786
vf = self.make_test_vf()
787
gc = vf._make_group_compressor()
788
self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
789
vf._max_bytes_to_index)
790
if isinstance(gc, groupcompress.PyrexGroupCompressor):
791
self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
792
gc._delta_index._max_bytes_to_index)
794
def test_max_bytes_to_index_in_config(self):
795
c = config.GlobalConfig()
796
c.set_user_option('bzr.groupcompress.max_bytes_to_index', '10000')
797
vf = self.make_test_vf()
798
gc = vf._make_group_compressor()
799
self.assertEqual(10000, vf._max_bytes_to_index)
800
if isinstance(gc, groupcompress.PyrexGroupCompressor):
801
self.assertEqual(10000, gc._delta_index._max_bytes_to_index)
803
def test_max_bytes_to_index_bad_config(self):
804
c = config.GlobalConfig()
805
c.set_user_option('bzr.groupcompress.max_bytes_to_index', 'boogah')
806
vf = self.make_test_vf()
807
# TODO: This is triggering a warning, we might want to trap and make
808
# sure it is readable.
809
gc = vf._make_group_compressor()
810
self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
811
vf._max_bytes_to_index)
812
if isinstance(gc, groupcompress.PyrexGroupCompressor):
813
self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
814
gc._delta_index._max_bytes_to_index)
817
class StubGCVF(object):
818
def __init__(self, canned_get_blocks=None):
819
self._group_cache = {}
820
self._canned_get_blocks = canned_get_blocks or []
821
def _get_blocks(self, read_memos):
822
return iter(self._canned_get_blocks)
825
class Test_BatchingBlockFetcher(TestCaseWithGroupCompressVersionedFiles):
826
"""Simple whitebox unit tests for _BatchingBlockFetcher."""
828
def test_add_key_new_read_memo(self):
829
"""Adding a key with an uncached read_memo new to this batch adds that
830
read_memo to the list of memos to fetch.
832
# locations are: index_memo, ignored, parents, ignored
833
# where index_memo is: (idx, offset, len, factory_start, factory_end)
834
# and (idx, offset, size) is known as the 'read_memo', identifying the
836
read_memo = ('fake index', 100, 50)
838
('key',): (read_memo + (None, None), None, None, None)}
839
batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
840
total_size = batcher.add_key(('key',))
841
self.assertEqual(50, total_size)
842
self.assertEqual([('key',)], batcher.keys)
843
self.assertEqual([read_memo], batcher.memos_to_get)
845
def test_add_key_duplicate_read_memo(self):
846
"""read_memos that occur multiple times in a batch will only be fetched
849
read_memo = ('fake index', 100, 50)
850
# Two keys, both sharing the same read memo (but different overall
853
('key1',): (read_memo + (0, 1), None, None, None),
854
('key2',): (read_memo + (1, 2), None, None, None)}
855
batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
856
total_size = batcher.add_key(('key1',))
857
total_size = batcher.add_key(('key2',))
858
self.assertEqual(50, total_size)
859
self.assertEqual([('key1',), ('key2',)], batcher.keys)
860
self.assertEqual([read_memo], batcher.memos_to_get)
862
def test_add_key_cached_read_memo(self):
863
"""Adding a key with a cached read_memo will not cause that read_memo
864
to be added to the list to fetch.
866
read_memo = ('fake index', 100, 50)
868
gcvf._group_cache[read_memo] = 'fake block'
870
('key',): (read_memo + (None, None), None, None, None)}
871
batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
872
total_size = batcher.add_key(('key',))
873
self.assertEqual(0, total_size)
874
self.assertEqual([('key',)], batcher.keys)
875
self.assertEqual([], batcher.memos_to_get)
877
def test_yield_factories_empty(self):
878
"""An empty batch yields no factories."""
879
batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), {})
880
self.assertEqual([], list(batcher.yield_factories()))
882
def test_yield_factories_calls_get_blocks(self):
883
"""Uncached memos are retrieved via get_blocks."""
884
read_memo1 = ('fake index', 100, 50)
885
read_memo2 = ('fake index', 150, 40)
888
(read_memo1, groupcompress.GroupCompressBlock()),
889
(read_memo2, groupcompress.GroupCompressBlock())])
891
('key1',): (read_memo1 + (0, 0), None, None, None),
892
('key2',): (read_memo2 + (0, 0), None, None, None)}
893
batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
894
batcher.add_key(('key1',))
895
batcher.add_key(('key2',))
896
factories = list(batcher.yield_factories(full_flush=True))
897
self.assertLength(2, factories)
898
keys = [f.key for f in factories]
899
kinds = [f.storage_kind for f in factories]
900
self.assertEqual([('key1',), ('key2',)], keys)
901
self.assertEqual(['groupcompress-block', 'groupcompress-block'], kinds)
903
def test_yield_factories_flushing(self):
904
"""yield_factories holds back on yielding results from the final block
905
unless passed full_flush=True.
907
fake_block = groupcompress.GroupCompressBlock()
908
read_memo = ('fake index', 100, 50)
910
gcvf._group_cache[read_memo] = fake_block
912
('key',): (read_memo + (0, 0), None, None, None)}
913
batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
914
batcher.add_key(('key',))
915
self.assertEqual([], list(batcher.yield_factories()))
916
factories = list(batcher.yield_factories(full_flush=True))
917
self.assertLength(1, factories)
918
self.assertEqual(('key',), factories[0].key)
919
self.assertEqual('groupcompress-block', factories[0].storage_kind)
922
class TestLazyGroupCompress(tests.TestCaseWithTransport):
925
(b'key1',): b"this is a text\n"
926
b"with a reasonable amount of compressible bytes\n"
927
b"which can be shared between various other texts\n",
928
(b'key2',): b"another text\n"
929
b"with a reasonable amount of compressible bytes\n"
930
b"which can be shared between various other texts\n",
931
(b'key3',): b"yet another text which won't be extracted\n"
932
b"with a reasonable amount of compressible bytes\n"
933
b"which can be shared between various other texts\n",
934
(b'key4',): b"this will be extracted\n"
935
b"but references most of its bytes from\n"
936
b"yet another text which won't be extracted\n"
937
b"with a reasonable amount of compressible bytes\n"
938
b"which can be shared between various other texts\n",
940
def make_block(self, key_to_text):
941
"""Create a GroupCompressBlock, filling it with the given texts."""
942
compressor = groupcompress.GroupCompressor()
944
for key in sorted(key_to_text):
945
compressor.compress(key, key_to_text[key], None)
946
locs = dict((key, (start, end)) for key, (start, _, end, _)
947
in compressor.labels_deltas.items())
948
block = compressor.flush()
949
raw_bytes = block.to_bytes()
950
return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
952
def add_key_to_manager(self, key, locations, block, manager):
953
start, end = locations[key]
954
manager.add_factory(key, (), start, end)
956
def make_block_and_full_manager(self, texts):
957
locations, block = self.make_block(texts)
958
manager = groupcompress._LazyGroupContentManager(block)
959
for key in sorted(texts):
960
self.add_key_to_manager(key, locations, block, manager)
961
return block, manager
963
def test_get_fulltexts(self):
964
locations, block = self.make_block(self._texts)
965
manager = groupcompress._LazyGroupContentManager(block)
966
self.add_key_to_manager((b'key1',), locations, block, manager)
967
self.add_key_to_manager((b'key2',), locations, block, manager)
969
for record in manager.get_record_stream():
970
result_order.append(record.key)
971
text = self._texts[record.key]
972
self.assertEqual(text, record.get_bytes_as('fulltext'))
973
self.assertEqual([(b'key1',), (b'key2',)], result_order)
975
# If we build the manager in the opposite order, we should get them
976
# back in the opposite order
977
manager = groupcompress._LazyGroupContentManager(block)
978
self.add_key_to_manager((b'key2',), locations, block, manager)
979
self.add_key_to_manager((b'key1',), locations, block, manager)
981
for record in manager.get_record_stream():
982
result_order.append(record.key)
983
text = self._texts[record.key]
984
self.assertEqual(text, record.get_bytes_as('fulltext'))
985
self.assertEqual([(b'key2',), (b'key1',)], result_order)
987
def test__wire_bytes_no_keys(self):
988
locations, block = self.make_block(self._texts)
989
manager = groupcompress._LazyGroupContentManager(block)
990
wire_bytes = manager._wire_bytes()
991
block_length = len(block.to_bytes())
992
# We should have triggered a strip, since we aren't using any content
993
stripped_block = manager._block.to_bytes()
994
self.assertTrue(block_length > len(stripped_block))
995
empty_z_header = zlib.compress(b'')
996
self.assertEqual(b'groupcompress-block\n'
997
b'8\n' # len(compress(''))
999
b'%d\n'# compressed block len
1002
% (len(stripped_block), empty_z_header,
1006
def test__wire_bytes(self):
1007
locations, block = self.make_block(self._texts)
1008
manager = groupcompress._LazyGroupContentManager(block)
1009
self.add_key_to_manager((b'key1',), locations, block, manager)
1010
self.add_key_to_manager((b'key4',), locations, block, manager)
1011
block_bytes = block.to_bytes()
1012
wire_bytes = manager._wire_bytes()
1013
(storage_kind, z_header_len, header_len,
1014
block_len, rest) = wire_bytes.split(b'\n', 4)
1015
z_header_len = int(z_header_len)
1016
header_len = int(header_len)
1017
block_len = int(block_len)
1018
self.assertEqual(b'groupcompress-block', storage_kind)
1019
self.assertEqual(34, z_header_len)
1020
self.assertEqual(26, header_len)
1021
self.assertEqual(len(block_bytes), block_len)
1022
z_header = rest[:z_header_len]
1023
header = zlib.decompress(z_header)
1024
self.assertEqual(header_len, len(header))
1025
entry1 = locations[(b'key1',)]
1026
entry4 = locations[(b'key4',)]
1027
self.assertEqualDiff(b'key1\n'
1029
b'%d\n' # start offset
1030
b'%d\n' # end offset
1035
% (entry1[0], entry1[1],
1036
entry4[0], entry4[1]),
1038
z_block = rest[z_header_len:]
1039
self.assertEqual(block_bytes, z_block)
1041
def test_from_bytes(self):
1042
locations, block = self.make_block(self._texts)
1043
manager = groupcompress._LazyGroupContentManager(block)
1044
self.add_key_to_manager((b'key1',), locations, block, manager)
1045
self.add_key_to_manager((b'key4',), locations, block, manager)
1046
wire_bytes = manager._wire_bytes()
1047
self.assertStartsWith(wire_bytes, b'groupcompress-block\n')
1048
manager = groupcompress._LazyGroupContentManager.from_bytes(wire_bytes)
1049
self.assertIsInstance(manager, groupcompress._LazyGroupContentManager)
1050
self.assertEqual(2, len(manager._factories))
1051
self.assertEqual(block._z_content, manager._block._z_content)
1053
for record in manager.get_record_stream():
1054
result_order.append(record.key)
1055
text = self._texts[record.key]
1056
self.assertEqual(text, record.get_bytes_as('fulltext'))
1057
self.assertEqual([(b'key1',), (b'key4',)], result_order)
1059
def test__check_rebuild_no_changes(self):
1060
block, manager = self.make_block_and_full_manager(self._texts)
1061
manager._check_rebuild_block()
1062
self.assertIs(block, manager._block)
1064
def test__check_rebuild_only_one(self):
1065
locations, block = self.make_block(self._texts)
1066
manager = groupcompress._LazyGroupContentManager(block)
1067
# Request just the first key, which should trigger a 'strip' action
1068
self.add_key_to_manager((b'key1',), locations, block, manager)
1069
manager._check_rebuild_block()
1070
self.assertIsNot(block, manager._block)
1071
self.assertTrue(block._content_length > manager._block._content_length)
1072
# We should be able to still get the content out of this block, though
1073
# it should only have 1 entry
1074
for record in manager.get_record_stream():
1075
self.assertEqual((b'key1',), record.key)
1076
self.assertEqual(self._texts[record.key],
1077
record.get_bytes_as('fulltext'))
1079
def test__check_rebuild_middle(self):
1080
locations, block = self.make_block(self._texts)
1081
manager = groupcompress._LazyGroupContentManager(block)
1082
# Request a small key in the middle should trigger a 'rebuild'
1083
self.add_key_to_manager((b'key4',), locations, block, manager)
1084
manager._check_rebuild_block()
1085
self.assertIsNot(block, manager._block)
1086
self.assertTrue(block._content_length > manager._block._content_length)
1087
for record in manager.get_record_stream():
1088
self.assertEqual((b'key4',), record.key)
1089
self.assertEqual(self._texts[record.key],
1090
record.get_bytes_as('fulltext'))
1092
def test_manager_default_compressor_settings(self):
1093
locations, old_block = self.make_block(self._texts)
1094
manager = groupcompress._LazyGroupContentManager(old_block)
1095
gcvf = groupcompress.GroupCompressVersionedFiles
1096
# It doesn't greedily evaluate _max_bytes_to_index
1097
self.assertIs(None, manager._compressor_settings)
1098
self.assertEqual(gcvf._DEFAULT_COMPRESSOR_SETTINGS,
1099
manager._get_compressor_settings())
1101
def test_manager_custom_compressor_settings(self):
1102
locations, old_block = self.make_block(self._texts)
1104
def compressor_settings():
1105
called.append('called')
1107
manager = groupcompress._LazyGroupContentManager(old_block,
1108
get_compressor_settings=compressor_settings)
1109
gcvf = groupcompress.GroupCompressVersionedFiles
1110
# It doesn't greedily evaluate compressor_settings
1111
self.assertIs(None, manager._compressor_settings)
1112
self.assertEqual((10,), manager._get_compressor_settings())
1113
self.assertEqual((10,), manager._get_compressor_settings())
1114
self.assertEqual((10,), manager._compressor_settings)
1115
# Only called 1 time
1116
self.assertEqual(['called'], called)
1118
def test__rebuild_handles_compressor_settings(self):
1119
if not isinstance(groupcompress.GroupCompressor,
1120
groupcompress.PyrexGroupCompressor):
1121
raise tests.TestNotApplicable('pure-python compressor'
1122
' does not handle compressor_settings')
1123
locations, old_block = self.make_block(self._texts)
1124
manager = groupcompress._LazyGroupContentManager(old_block,
1125
get_compressor_settings=lambda: dict(max_bytes_to_index=32))
1126
gc = manager._make_group_compressor()
1127
self.assertEqual(32, gc._delta_index._max_bytes_to_index)
1128
self.add_key_to_manager((b'key3',), locations, old_block, manager)
1129
self.add_key_to_manager((b'key4',), locations, old_block, manager)
1130
action, last_byte, total_bytes = manager._check_rebuild_action()
1131
self.assertEqual('rebuild', action)
1132
manager._rebuild_block()
1133
new_block = manager._block
1134
self.assertIsNot(old_block, new_block)
1135
# Because of the new max_bytes_to_index, we do a poor job of
1136
# rebuilding. This is a side-effect of the change, but at least it does
1137
# show the setting had an effect.
1138
self.assertTrue(old_block._content_length < new_block._content_length)
1140
def test_check_is_well_utilized_all_keys(self):
1141
block, manager = self.make_block_and_full_manager(self._texts)
1142
self.assertFalse(manager.check_is_well_utilized())
1143
# Though we can fake it by changing the recommended minimum size
1144
manager._full_enough_block_size = block._content_length
1145
self.assertTrue(manager.check_is_well_utilized())
1146
# Setting it just above causes it to fail
1147
manager._full_enough_block_size = block._content_length + 1
1148
self.assertFalse(manager.check_is_well_utilized())
1149
# Setting the mixed-block size doesn't do anything, because the content
1150
# is considered to not be 'mixed'
1151
manager._full_enough_mixed_block_size = block._content_length
1152
self.assertFalse(manager.check_is_well_utilized())
1154
def test_check_is_well_utilized_mixed_keys(self):
1156
f1k1 = (b'f1', b'k1')
1157
f1k2 = (b'f1', b'k2')
1158
f2k1 = (b'f2', b'k1')
1159
f2k2 = (b'f2', b'k2')
1160
texts[f1k1] = self._texts[(b'key1',)]
1161
texts[f1k2] = self._texts[(b'key2',)]
1162
texts[f2k1] = self._texts[(b'key3',)]
1163
texts[f2k2] = self._texts[(b'key4',)]
1164
block, manager = self.make_block_and_full_manager(texts)
1165
self.assertFalse(manager.check_is_well_utilized())
1166
manager._full_enough_block_size = block._content_length
1167
self.assertTrue(manager.check_is_well_utilized())
1168
manager._full_enough_block_size = block._content_length + 1
1169
self.assertFalse(manager.check_is_well_utilized())
1170
manager._full_enough_mixed_block_size = block._content_length
1171
self.assertTrue(manager.check_is_well_utilized())
1173
def test_check_is_well_utilized_partial_use(self):
1174
locations, block = self.make_block(self._texts)
1175
manager = groupcompress._LazyGroupContentManager(block)
1176
manager._full_enough_block_size = block._content_length
1177
self.add_key_to_manager((b'key1',), locations, block, manager)
1178
self.add_key_to_manager((b'key2',), locations, block, manager)
1179
# Just using the content from key1 and 2 is not enough to be considered
1181
self.assertFalse(manager.check_is_well_utilized())
1182
# However if we add key3, then we have enough, as we only require 75%
1184
self.add_key_to_manager((b'key4',), locations, block, manager)
1185
self.assertTrue(manager.check_is_well_utilized())
1188
class Test_GCBuildDetails(tests.TestCase):
1190
def test_acts_like_tuple(self):
1191
# _GCBuildDetails inlines some of the data that used to be spread out
1192
# across a bunch of tuples
1193
bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1194
('INDEX', 10, 20, 0, 5))
1195
self.assertEqual(4, len(bd))
1196
self.assertEqual(('INDEX', 10, 20, 0, 5), bd[0])
1197
self.assertEqual(None, bd[1]) # Compression Parent is always None
1198
self.assertEqual((('parent1',), ('parent2',)), bd[2])
1199
self.assertEqual(('group', None), bd[3]) # Record details
1201
def test__repr__(self):
1202
bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1203
('INDEX', 10, 20, 0, 5))
1204
self.assertEqual("_GCBuildDetails(('INDEX', 10, 20, 0, 5),"
1205
" (('parent1',), ('parent2',)))",