1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
27
from .scenarios import (
28
load_tests_apply_scenarios,
30
from ..sixish import (
38
def module_scenarios():
40
('python', {'_gc_module': _groupcompress_py}),
42
if compiled_groupcompress_feature.available():
43
gc_module = compiled_groupcompress_feature.module
44
scenarios.append(('C',
45
{'_gc_module': gc_module}))
49
def two_way_scenarios():
51
('PP', {'make_delta': _groupcompress_py.make_delta,
52
'apply_delta': _groupcompress_py.apply_delta})
54
if compiled_groupcompress_feature.available():
55
gc_module = compiled_groupcompress_feature.module
57
('CC', {'make_delta': gc_module.make_delta,
58
'apply_delta': gc_module.apply_delta}),
59
('PC', {'make_delta': _groupcompress_py.make_delta,
60
'apply_delta': gc_module.apply_delta}),
61
('CP', {'make_delta': gc_module.make_delta,
62
'apply_delta': _groupcompress_py.apply_delta}),
67
load_tests = load_tests_apply_scenarios
70
compiled_groupcompress_feature = features.ModuleAvailableFeature(
71
'breezy.bzr._groupcompress_pyx')
76
which is meant to be matched
83
which is meant to differ from
90
which is meant to be matched
94
at the end of the file
100
common with the next text
104
some more bit of text, that
105
does not have much in
106
common with the previous text
107
and has some extra text
113
has some in common with the previous text
114
and has some extra text
116
common with the next text
131
class TestMakeAndApplyDelta(tests.TestCase):
133
scenarios = module_scenarios()
134
_gc_module = None # Set by load_tests
137
super(TestMakeAndApplyDelta, self).setUp()
138
self.make_delta = self._gc_module.make_delta
139
self.apply_delta = self._gc_module.apply_delta
140
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
142
def test_make_delta_is_typesafe(self):
143
self.make_delta(b'a string', b'another string')
145
def _check_make_delta(string1, string2):
146
self.assertRaises(TypeError, self.make_delta, string1, string2)
148
_check_make_delta(b'a string', object())
149
_check_make_delta(b'a string', u'not a string')
150
_check_make_delta(object(), b'a string')
151
_check_make_delta(u'not a string', b'a string')
153
def test_make_noop_delta(self):
154
ident_delta = self.make_delta(_text1, _text1)
155
self.assertEqual(b'M\x90M', ident_delta)
156
ident_delta = self.make_delta(_text2, _text2)
157
self.assertEqual(b'N\x90N', ident_delta)
158
ident_delta = self.make_delta(_text3, _text3)
159
self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
161
def assertDeltaIn(self, delta1, delta2, delta):
162
"""Make sure that the delta bytes match one of the expectations."""
163
# In general, the python delta matcher gives different results than the
164
# pyrex delta matcher. Both should be valid deltas, though.
165
if delta not in (delta1, delta2):
166
self.fail(b"Delta bytes:\n"
170
% (delta, delta1, delta2))
172
def test_make_delta(self):
173
delta = self.make_delta(_text1, _text2)
175
b'N\x90/\x1fdiffer from\nagainst other text\n',
176
b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
178
delta = self.make_delta(_text2, _text1)
180
b'M\x90/\x1ebe matched\nagainst other text\n',
181
b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
183
delta = self.make_delta(_text3, _text1)
184
self.assertEqual(b'M\x90M', delta)
185
delta = self.make_delta(_text3, _text2)
187
b'N\x90/\x1fdiffer from\nagainst other text\n',
188
b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
191
def test_make_delta_with_large_copies(self):
192
# We want to have a copy that is larger than 64kB, which forces us to
193
# issue multiple copy instructions.
194
big_text = _text3 * 1220
195
delta = self.make_delta(big_text, big_text)
197
b'\xdc\x86\x0a' # Encoding the length of the uncompressed text
198
b'\x80' # Copy 64kB, starting at byte 0
199
b'\x84\x01' # and another 64kB starting at 64kB
200
b'\xb4\x02\x5c\x83', # And the bit of tail.
201
None, # Both implementations should be identical
204
def test_apply_delta_is_typesafe(self):
205
self.apply_delta(_text1, b'M\x90M')
206
self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
207
self.assertRaises(TypeError, self.apply_delta,
208
_text1.decode('latin1'), b'M\x90M')
209
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
210
self.assertRaises(TypeError, self.apply_delta, _text1, object())
212
def test_apply_delta(self):
213
target = self.apply_delta(_text1,
214
b'N\x90/\x1fdiffer from\nagainst other text\n')
215
self.assertEqual(_text2, target)
216
target = self.apply_delta(_text2,
217
b'M\x90/\x1ebe matched\nagainst other text\n')
218
self.assertEqual(_text1, target)
220
def test_apply_delta_to_source_is_safe(self):
221
self.assertRaises(TypeError,
222
self.apply_delta_to_source, object(), 0, 1)
223
self.assertRaises(TypeError,
224
self.apply_delta_to_source, u'unicode str', 0, 1)
226
self.assertRaises(ValueError,
227
self.apply_delta_to_source, b'foo', 1, 4)
229
self.assertRaises(ValueError,
230
self.apply_delta_to_source, b'foo', 5, 3)
232
self.assertRaises(ValueError,
233
self.apply_delta_to_source, b'foo', 3, 2)
235
def test_apply_delta_to_source(self):
236
source_and_delta = (_text1
237
+ b'N\x90/\x1fdiffer from\nagainst other text\n')
238
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
239
len(_text1), len(source_and_delta)))
242
class TestMakeAndApplyCompatible(tests.TestCase):
244
scenarios = two_way_scenarios()
246
make_delta = None # Set by load_tests
247
apply_delta = None # Set by load_tests
249
def assertMakeAndApply(self, source, target):
250
"""Assert that generating a delta and applying gives success."""
251
delta = self.make_delta(source, target)
252
bytes = self.apply_delta(source, delta)
253
self.assertEqualDiff(target, bytes)
255
def test_direct(self):
256
self.assertMakeAndApply(_text1, _text2)
257
self.assertMakeAndApply(_text2, _text1)
258
self.assertMakeAndApply(_text1, _text3)
259
self.assertMakeAndApply(_text3, _text1)
260
self.assertMakeAndApply(_text2, _text3)
261
self.assertMakeAndApply(_text3, _text2)
264
class TestDeltaIndex(tests.TestCase):
267
super(TestDeltaIndex, self).setUp()
268
# This test isn't multiplied, because we only have DeltaIndex for the
270
# We call this here, because _test_needs_features happens after setUp
271
self.requireFeature(compiled_groupcompress_feature)
272
self._gc_module = compiled_groupcompress_feature.module
275
di = self._gc_module.DeltaIndex(b'test text\n')
276
self.assertEqual('DeltaIndex(1, 10)', repr(di))
278
def test_sizeof(self):
279
di = self._gc_module.DeltaIndex()
280
# Exact value will depend on platform but should include sources
281
# source_info is a pointer and two longs so at least 12 bytes
282
lower_bound = di._max_num_sources * 12
283
self.assertGreater(sys.getsizeof(di), lower_bound)
285
def test__dump_no_index(self):
286
di = self._gc_module.DeltaIndex()
287
self.assertEqual(None, di._dump_index())
289
def test__dump_index_simple(self):
290
di = self._gc_module.DeltaIndex()
291
di.add_source(_text1, 0)
292
self.assertFalse(di._has_index())
293
self.assertEqual(None, di._dump_index())
294
_ = di.make_delta(_text1)
295
self.assertTrue(di._has_index())
296
hash_list, entry_list = di._dump_index()
297
self.assertEqual(16, len(hash_list))
298
self.assertEqual(68, len(entry_list))
299
just_entries = [(idx, text_offset, hash_val)
300
for idx, (text_offset, hash_val)
301
in enumerate(entry_list)
302
if text_offset != 0 or hash_val != 0]
303
rabin_hash = self._gc_module._rabin_hash
304
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
305
(25, 48, rabin_hash(_text1[33:49])),
306
(34, 32, rabin_hash(_text1[17:33])),
307
(47, 64, rabin_hash(_text1[49:65])),
309
# This ensures that the hash map points to the location we expect it to
310
for entry_idx, text_offset, hash_val in just_entries:
311
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
313
def test__dump_index_two_sources(self):
314
di = self._gc_module.DeltaIndex()
315
di.add_source(_text1, 0)
316
di.add_source(_text2, 2)
317
start2 = len(_text1) + 2
318
self.assertTrue(di._has_index())
319
hash_list, entry_list = di._dump_index()
320
self.assertEqual(16, len(hash_list))
321
self.assertEqual(68, len(entry_list))
322
just_entries = [(idx, text_offset, hash_val)
323
for idx, (text_offset, hash_val)
324
in enumerate(entry_list)
325
if text_offset != 0 or hash_val != 0]
326
rabin_hash = self._gc_module._rabin_hash
327
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
328
(9, start2 + 16, rabin_hash(_text2[1:17])),
329
(25, 48, rabin_hash(_text1[33:49])),
330
(30, start2 + 64, rabin_hash(_text2[49:65])),
331
(34, 32, rabin_hash(_text1[17:33])),
332
(35, start2 + 32, rabin_hash(_text2[17:33])),
333
(43, start2 + 48, rabin_hash(_text2[33:49])),
334
(47, 64, rabin_hash(_text1[49:65])),
336
# Each entry should be in the appropriate hash bucket.
337
for entry_idx, text_offset, hash_val in just_entries:
338
hash_idx = hash_val & 0xf
340
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
342
def test_first_add_source_doesnt_index_until_make_delta(self):
343
di = self._gc_module.DeltaIndex()
344
self.assertFalse(di._has_index())
345
di.add_source(_text1, 0)
346
self.assertFalse(di._has_index())
347
# However, asking to make a delta will trigger the index to be
348
# generated, and will generate a proper delta
349
delta = di.make_delta(_text2)
350
self.assertTrue(di._has_index())
351
self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
353
def test_add_source_max_bytes_to_index(self):
354
di = self._gc_module.DeltaIndex()
355
di._max_bytes_to_index = 3 * 16
356
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
357
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
358
start2 = len(_text1) + 3
359
hash_list, entry_list = di._dump_index()
360
self.assertEqual(16, len(hash_list))
361
self.assertEqual(67, len(entry_list))
362
just_entries = sorted([(text_offset, hash_val)
363
for text_offset, hash_val in entry_list
364
if text_offset != 0 or hash_val != 0])
365
rabin_hash = self._gc_module._rabin_hash
366
self.assertEqual([(25, rabin_hash(_text1[10:26])),
367
(50, rabin_hash(_text1[35:51])),
368
(75, rabin_hash(_text1[60:76])),
369
(start2 + 44, rabin_hash(_text3[29:45])),
370
(start2 + 88, rabin_hash(_text3[73:89])),
371
(start2 + 132, rabin_hash(_text3[117:133])),
374
def test_second_add_source_triggers_make_index(self):
375
di = self._gc_module.DeltaIndex()
376
self.assertFalse(di._has_index())
377
di.add_source(_text1, 0)
378
self.assertFalse(di._has_index())
379
di.add_source(_text2, 0)
380
self.assertTrue(di._has_index())
382
def test_make_delta(self):
383
di = self._gc_module.DeltaIndex(_text1)
384
delta = di.make_delta(_text2)
385
self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
387
def test_delta_against_multiple_sources(self):
388
di = self._gc_module.DeltaIndex()
389
di.add_source(_first_text, 0)
390
self.assertEqual(len(_first_text), di._source_offset)
391
di.add_source(_second_text, 0)
392
self.assertEqual(len(_first_text) + len(_second_text),
394
delta = di.make_delta(_third_text)
395
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
396
self.assertEqualDiff(_third_text, result)
397
self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
398
b'\x91v6\x03and\x91d"\x91:\n', delta)
400
def test_delta_with_offsets(self):
401
di = self._gc_module.DeltaIndex()
402
di.add_source(_first_text, 5)
403
self.assertEqual(len(_first_text) + 5, di._source_offset)
404
di.add_source(_second_text, 10)
405
self.assertEqual(len(_first_text) + len(_second_text) + 15,
407
delta = di.make_delta(_third_text)
408
self.assertIsNot(None, delta)
409
result = self._gc_module.apply_delta(
410
b'12345' + _first_text + b'1234567890' + _second_text, delta)
411
self.assertIsNot(None, result)
412
self.assertEqualDiff(_third_text, result)
413
self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
414
b'\x91\x856\x03and\x91s"\x91?\n', delta)
416
def test_delta_with_delta_bytes(self):
417
di = self._gc_module.DeltaIndex()
419
di.add_source(_first_text, 0)
420
self.assertEqual(len(_first_text), di._source_offset)
421
delta = di.make_delta(_second_text)
422
self.assertEqual(b'h\tsome more\x91\x019'
423
b'&previous text\nand has some extra text\n', delta)
424
di.add_delta_source(delta, 0)
426
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
427
second_delta = di.make_delta(_third_text)
428
result = self._gc_module.apply_delta(source, second_delta)
429
self.assertEqualDiff(_third_text, result)
430
# We should be able to match against the
431
# 'previous text\nand has some...' that was part of the delta bytes
432
# Note that we don't match the 'common with the', because it isn't long
433
# enough to match in the original text, and those bytes are not present
434
# in the delta for the second text.
435
self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
436
b'\x91S&\x03and\x91\x18,', second_delta)
437
# Add this delta, and create a new delta for the same text. We should
438
# find the remaining text, and only insert the short 'and' text.
439
di.add_delta_source(second_delta, 0)
440
source += second_delta
441
third_delta = di.make_delta(_third_text)
442
result = self._gc_module.apply_delta(source, third_delta)
443
self.assertEqualDiff(_third_text, result)
444
self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
445
b'\x91S&\x03and\x91\x18,', third_delta)
446
# Now create a delta, which we know won't be able to be 'fit' into the
448
fourth_delta = di.make_delta(_fourth_text)
449
self.assertEqual(_fourth_text,
450
self._gc_module.apply_delta(source, fourth_delta))
451
self.assertEqual(b'\x80\x01'
452
b'\x7f123456789012345\nsame rabin hash\n'
453
b'123456789012345\nsame rabin hash\n'
454
b'123456789012345\nsame rabin hash\n'
455
b'123456789012345\nsame rabin hash'
456
b'\x01\n', fourth_delta)
457
di.add_delta_source(fourth_delta, 0)
458
source += fourth_delta
459
# With the next delta, everything should be found
460
fifth_delta = di.make_delta(_fourth_text)
461
self.assertEqual(_fourth_text,
462
self._gc_module.apply_delta(source, fifth_delta))
463
self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
466
class TestCopyInstruction(tests.TestCase):
468
def assertEncode(self, expected, offset, length):
469
data = _groupcompress_py.encode_copy_instruction(offset, length)
470
self.assertEqual(expected, data)
472
def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
473
cmd = indexbytes(data, pos)
475
out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
476
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
478
def test_encode_no_length(self):
479
self.assertEncode(b'\x80', 0, 64 * 1024)
480
self.assertEncode(b'\x81\x01', 1, 64 * 1024)
481
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
482
self.assertEncode(b'\x81\xff', 255, 64 * 1024)
483
self.assertEncode(b'\x82\x01', 256, 64 * 1024)
484
self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
485
self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
486
self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
487
self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
488
self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
489
self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
490
self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
492
def test_encode_no_offset(self):
493
self.assertEncode(b'\x90\x01', 0, 1)
494
self.assertEncode(b'\x90\x0a', 0, 10)
495
self.assertEncode(b'\x90\xff', 0, 255)
496
self.assertEncode(b'\xA0\x01', 0, 256)
497
self.assertEncode(b'\xB0\x01\x01', 0, 257)
498
self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
499
# Special case, if copy == 64KiB, then we store exactly 0
500
# Note that this puns with a copy of exactly 0 bytes, but we don't care
501
# about that, as we would never actually copy 0 bytes
502
self.assertEncode(b'\x80', 0, 64 * 1024)
504
def test_encode(self):
505
self.assertEncode(b'\x91\x01\x01', 1, 1)
506
self.assertEncode(b'\x91\x09\x0a', 9, 10)
507
self.assertEncode(b'\x91\xfe\xff', 254, 255)
508
self.assertEncode(b'\xA2\x02\x01', 512, 256)
509
self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
510
self.assertEncode(b'\xB0\x01\x01', 0, 257)
511
# Special case, if copy == 64KiB, then we store exactly 0
512
# Note that this puns with a copy of exactly 0 bytes, but we don't care
513
# about that, as we would never actually copy 0 bytes
514
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
516
def test_decode_no_length(self):
517
# If length is 0, it is interpreted as 64KiB
518
# The shortest possible instruction is a copy of 64KiB from offset 0
519
self.assertDecode(0, 65536, 1, b'\x80', 0)
520
self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
521
self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
522
self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
523
self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
524
self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
525
self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
526
self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
527
self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
528
self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
529
self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
530
self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
532
def test_decode_no_offset(self):
533
self.assertDecode(0, 1, 2, b'\x90\x01', 0)
534
self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
535
self.assertDecode(0, 255, 2, b'\x90\xff', 0)
536
self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
537
self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
538
self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
539
# Special case, if copy == 64KiB, then we store exactly 0
540
# Note that this puns with a copy of exactly 0 bytes, but we don't care
541
# about that, as we would never actually copy 0 bytes
542
self.assertDecode(0, 65536, 1, b'\x80', 0)
544
def test_decode(self):
545
self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
546
self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
547
self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
548
self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
549
self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
550
self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
552
def test_decode_not_start(self):
553
self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
554
self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
555
self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
558
class TestBase128Int(tests.TestCase):
560
scenarios = module_scenarios()
562
_gc_module = None # Set by load_tests
564
def assertEqualEncode(self, bytes, val):
565
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
567
def assertEqualDecode(self, val, num_decode, bytes):
568
self.assertEqual((val, num_decode),
569
self._gc_module.decode_base128_int(bytes))
571
def test_encode(self):
572
self.assertEqualEncode(b'\x01', 1)
573
self.assertEqualEncode(b'\x02', 2)
574
self.assertEqualEncode(b'\x7f', 127)
575
self.assertEqualEncode(b'\x80\x01', 128)
576
self.assertEqualEncode(b'\xff\x01', 255)
577
self.assertEqualEncode(b'\x80\x02', 256)
578
self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
580
def test_decode(self):
581
self.assertEqualDecode(1, 1, b'\x01')
582
self.assertEqualDecode(2, 1, b'\x02')
583
self.assertEqualDecode(127, 1, b'\x7f')
584
self.assertEqualDecode(128, 2, b'\x80\x01')
585
self.assertEqualDecode(255, 2, b'\xff\x01')
586
self.assertEqualDecode(256, 2, b'\x80\x02')
587
self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
589
def test_decode_with_trailing_bytes(self):
590
self.assertEqualDecode(1, 1, b'\x01abcdef')
591
self.assertEqualDecode(127, 1, b'\x7f\x01')
592
self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
593
self.assertEqualDecode(255, 2, b'\xff\x01\xff')