1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
27
from .scenarios import (
28
load_tests_apply_scenarios,
35
def module_scenarios():
37
('python', {'_gc_module': _groupcompress_py}),
39
if compiled_groupcompress_feature.available():
40
gc_module = compiled_groupcompress_feature.module
41
scenarios.append(('C',
42
{'_gc_module': gc_module}))
46
def two_way_scenarios():
48
('PP', {'make_delta': _groupcompress_py.make_delta,
49
'apply_delta': _groupcompress_py.apply_delta})
51
if compiled_groupcompress_feature.available():
52
gc_module = compiled_groupcompress_feature.module
54
('CC', {'make_delta': gc_module.make_delta,
55
'apply_delta': gc_module.apply_delta}),
56
('PC', {'make_delta': _groupcompress_py.make_delta,
57
'apply_delta': gc_module.apply_delta}),
58
('CP', {'make_delta': gc_module.make_delta,
59
'apply_delta': _groupcompress_py.apply_delta}),
64
load_tests = load_tests_apply_scenarios
67
compiled_groupcompress_feature = features.ModuleAvailableFeature(
68
'breezy.bzr._groupcompress_pyx')
73
which is meant to be matched
80
which is meant to differ from
87
which is meant to be matched
91
at the end of the file
97
common with the next text
101
some more bit of text, that
102
does not have much in
103
common with the previous text
104
and has some extra text
110
has some in common with the previous text
111
and has some extra text
113
common with the next text
128
class TestMakeAndApplyDelta(tests.TestCase):
130
scenarios = module_scenarios()
131
_gc_module = None # Set by load_tests
134
super(TestMakeAndApplyDelta, self).setUp()
135
self.make_delta = self._gc_module.make_delta
136
self.apply_delta = self._gc_module.apply_delta
137
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
139
def test_make_delta_is_typesafe(self):
140
self.make_delta(b'a string', b'another string')
142
def _check_make_delta(string1, string2):
143
self.assertRaises(TypeError, self.make_delta, string1, string2)
145
_check_make_delta(b'a string', object())
146
_check_make_delta(b'a string', u'not a string')
147
_check_make_delta(object(), b'a string')
148
_check_make_delta(u'not a string', b'a string')
150
def test_make_noop_delta(self):
151
ident_delta = self.make_delta(_text1, _text1)
152
self.assertEqual(b'M\x90M', ident_delta)
153
ident_delta = self.make_delta(_text2, _text2)
154
self.assertEqual(b'N\x90N', ident_delta)
155
ident_delta = self.make_delta(_text3, _text3)
156
self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
158
def assertDeltaIn(self, delta1, delta2, delta):
159
"""Make sure that the delta bytes match one of the expectations."""
160
# In general, the python delta matcher gives different results than the
161
# pyrex delta matcher. Both should be valid deltas, though.
162
if delta not in (delta1, delta2):
163
self.fail(b"Delta bytes:\n"
167
% (delta, delta1, delta2))
169
def test_make_delta(self):
170
delta = self.make_delta(_text1, _text2)
172
b'N\x90/\x1fdiffer from\nagainst other text\n',
173
b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
175
delta = self.make_delta(_text2, _text1)
177
b'M\x90/\x1ebe matched\nagainst other text\n',
178
b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
180
delta = self.make_delta(_text3, _text1)
181
self.assertEqual(b'M\x90M', delta)
182
delta = self.make_delta(_text3, _text2)
184
b'N\x90/\x1fdiffer from\nagainst other text\n',
185
b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
188
def test_make_delta_with_large_copies(self):
189
# We want to have a copy that is larger than 64kB, which forces us to
190
# issue multiple copy instructions.
191
big_text = _text3 * 1220
192
delta = self.make_delta(big_text, big_text)
194
b'\xdc\x86\x0a' # Encoding the length of the uncompressed text
195
b'\x80' # Copy 64kB, starting at byte 0
196
b'\x84\x01' # and another 64kB starting at 64kB
197
b'\xb4\x02\x5c\x83', # And the bit of tail.
198
None, # Both implementations should be identical
201
def test_apply_delta_is_typesafe(self):
202
self.apply_delta(_text1, b'M\x90M')
203
self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
204
self.assertRaises(TypeError, self.apply_delta,
205
_text1.decode('latin1'), b'M\x90M')
206
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
207
self.assertRaises(TypeError, self.apply_delta, _text1, object())
209
def test_apply_delta(self):
210
target = self.apply_delta(_text1,
211
b'N\x90/\x1fdiffer from\nagainst other text\n')
212
self.assertEqual(_text2, target)
213
target = self.apply_delta(_text2,
214
b'M\x90/\x1ebe matched\nagainst other text\n')
215
self.assertEqual(_text1, target)
217
def test_apply_delta_to_source_is_safe(self):
218
self.assertRaises(TypeError,
219
self.apply_delta_to_source, object(), 0, 1)
220
self.assertRaises(TypeError,
221
self.apply_delta_to_source, u'unicode str', 0, 1)
223
self.assertRaises(ValueError,
224
self.apply_delta_to_source, b'foo', 1, 4)
226
self.assertRaises(ValueError,
227
self.apply_delta_to_source, b'foo', 5, 3)
229
self.assertRaises(ValueError,
230
self.apply_delta_to_source, b'foo', 3, 2)
232
def test_apply_delta_to_source(self):
233
source_and_delta = (_text1
234
+ b'N\x90/\x1fdiffer from\nagainst other text\n')
235
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
236
len(_text1), len(source_and_delta)))
239
class TestMakeAndApplyCompatible(tests.TestCase):
241
scenarios = two_way_scenarios()
243
make_delta = None # Set by load_tests
244
apply_delta = None # Set by load_tests
246
def assertMakeAndApply(self, source, target):
247
"""Assert that generating a delta and applying gives success."""
248
delta = self.make_delta(source, target)
249
bytes = self.apply_delta(source, delta)
250
self.assertEqualDiff(target, bytes)
252
def test_direct(self):
253
self.assertMakeAndApply(_text1, _text2)
254
self.assertMakeAndApply(_text2, _text1)
255
self.assertMakeAndApply(_text1, _text3)
256
self.assertMakeAndApply(_text3, _text1)
257
self.assertMakeAndApply(_text2, _text3)
258
self.assertMakeAndApply(_text3, _text2)
261
class TestDeltaIndex(tests.TestCase):
264
super(TestDeltaIndex, self).setUp()
265
# This test isn't multiplied, because we only have DeltaIndex for the
267
# We call this here, because _test_needs_features happens after setUp
268
self.requireFeature(compiled_groupcompress_feature)
269
self._gc_module = compiled_groupcompress_feature.module
272
di = self._gc_module.DeltaIndex(b'test text\n')
273
self.assertEqual('DeltaIndex(1, 10)', repr(di))
275
def test_sizeof(self):
276
di = self._gc_module.DeltaIndex()
277
# Exact value will depend on platform but should include sources
278
# source_info is a pointer and two longs so at least 12 bytes
279
lower_bound = di._max_num_sources * 12
280
self.assertGreater(sys.getsizeof(di), lower_bound)
282
def test__dump_no_index(self):
283
di = self._gc_module.DeltaIndex()
284
self.assertEqual(None, di._dump_index())
286
def test__dump_index_simple(self):
287
di = self._gc_module.DeltaIndex()
288
di.add_source(_text1, 0)
289
self.assertFalse(di._has_index())
290
self.assertEqual(None, di._dump_index())
291
_ = di.make_delta(_text1)
292
self.assertTrue(di._has_index())
293
hash_list, entry_list = di._dump_index()
294
self.assertEqual(16, len(hash_list))
295
self.assertEqual(68, len(entry_list))
296
just_entries = [(idx, text_offset, hash_val)
297
for idx, (text_offset, hash_val)
298
in enumerate(entry_list)
299
if text_offset != 0 or hash_val != 0]
300
rabin_hash = self._gc_module._rabin_hash
301
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
302
(25, 48, rabin_hash(_text1[33:49])),
303
(34, 32, rabin_hash(_text1[17:33])),
304
(47, 64, rabin_hash(_text1[49:65])),
306
# This ensures that the hash map points to the location we expect it to
307
for entry_idx, text_offset, hash_val in just_entries:
308
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
310
def test__dump_index_two_sources(self):
311
di = self._gc_module.DeltaIndex()
312
di.add_source(_text1, 0)
313
di.add_source(_text2, 2)
314
start2 = len(_text1) + 2
315
self.assertTrue(di._has_index())
316
hash_list, entry_list = di._dump_index()
317
self.assertEqual(16, len(hash_list))
318
self.assertEqual(68, len(entry_list))
319
just_entries = [(idx, text_offset, hash_val)
320
for idx, (text_offset, hash_val)
321
in enumerate(entry_list)
322
if text_offset != 0 or hash_val != 0]
323
rabin_hash = self._gc_module._rabin_hash
324
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
325
(9, start2 + 16, rabin_hash(_text2[1:17])),
326
(25, 48, rabin_hash(_text1[33:49])),
327
(30, start2 + 64, rabin_hash(_text2[49:65])),
328
(34, 32, rabin_hash(_text1[17:33])),
329
(35, start2 + 32, rabin_hash(_text2[17:33])),
330
(43, start2 + 48, rabin_hash(_text2[33:49])),
331
(47, 64, rabin_hash(_text1[49:65])),
333
# Each entry should be in the appropriate hash bucket.
334
for entry_idx, text_offset, hash_val in just_entries:
335
hash_idx = hash_val & 0xf
337
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
339
def test_first_add_source_doesnt_index_until_make_delta(self):
340
di = self._gc_module.DeltaIndex()
341
self.assertFalse(di._has_index())
342
di.add_source(_text1, 0)
343
self.assertFalse(di._has_index())
344
# However, asking to make a delta will trigger the index to be
345
# generated, and will generate a proper delta
346
delta = di.make_delta(_text2)
347
self.assertTrue(di._has_index())
348
self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
350
def test_add_source_max_bytes_to_index(self):
351
di = self._gc_module.DeltaIndex()
352
di._max_bytes_to_index = 3 * 16
353
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
354
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
355
start2 = len(_text1) + 3
356
hash_list, entry_list = di._dump_index()
357
self.assertEqual(16, len(hash_list))
358
self.assertEqual(67, len(entry_list))
359
just_entries = sorted([(text_offset, hash_val)
360
for text_offset, hash_val in entry_list
361
if text_offset != 0 or hash_val != 0])
362
rabin_hash = self._gc_module._rabin_hash
363
self.assertEqual([(25, rabin_hash(_text1[10:26])),
364
(50, rabin_hash(_text1[35:51])),
365
(75, rabin_hash(_text1[60:76])),
366
(start2 + 44, rabin_hash(_text3[29:45])),
367
(start2 + 88, rabin_hash(_text3[73:89])),
368
(start2 + 132, rabin_hash(_text3[117:133])),
371
def test_second_add_source_triggers_make_index(self):
372
di = self._gc_module.DeltaIndex()
373
self.assertFalse(di._has_index())
374
di.add_source(_text1, 0)
375
self.assertFalse(di._has_index())
376
di.add_source(_text2, 0)
377
self.assertTrue(di._has_index())
379
def test_make_delta(self):
380
di = self._gc_module.DeltaIndex(_text1)
381
delta = di.make_delta(_text2)
382
self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
384
def test_delta_against_multiple_sources(self):
385
di = self._gc_module.DeltaIndex()
386
di.add_source(_first_text, 0)
387
self.assertEqual(len(_first_text), di._source_offset)
388
di.add_source(_second_text, 0)
389
self.assertEqual(len(_first_text) + len(_second_text),
391
delta = di.make_delta(_third_text)
392
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
393
self.assertEqualDiff(_third_text, result)
394
self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
395
b'\x91v6\x03and\x91d"\x91:\n', delta)
397
def test_delta_with_offsets(self):
398
di = self._gc_module.DeltaIndex()
399
di.add_source(_first_text, 5)
400
self.assertEqual(len(_first_text) + 5, di._source_offset)
401
di.add_source(_second_text, 10)
402
self.assertEqual(len(_first_text) + len(_second_text) + 15,
404
delta = di.make_delta(_third_text)
405
self.assertIsNot(None, delta)
406
result = self._gc_module.apply_delta(
407
b'12345' + _first_text + b'1234567890' + _second_text, delta)
408
self.assertIsNot(None, result)
409
self.assertEqualDiff(_third_text, result)
410
self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
411
b'\x91\x856\x03and\x91s"\x91?\n', delta)
413
def test_delta_with_delta_bytes(self):
414
di = self._gc_module.DeltaIndex()
416
di.add_source(_first_text, 0)
417
self.assertEqual(len(_first_text), di._source_offset)
418
delta = di.make_delta(_second_text)
419
self.assertEqual(b'h\tsome more\x91\x019'
420
b'&previous text\nand has some extra text\n', delta)
421
di.add_delta_source(delta, 0)
423
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
424
second_delta = di.make_delta(_third_text)
425
result = self._gc_module.apply_delta(source, second_delta)
426
self.assertEqualDiff(_third_text, result)
427
# We should be able to match against the
428
# 'previous text\nand has some...' that was part of the delta bytes
429
# Note that we don't match the 'common with the', because it isn't long
430
# enough to match in the original text, and those bytes are not present
431
# in the delta for the second text.
432
self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
433
b'\x91S&\x03and\x91\x18,', second_delta)
434
# Add this delta, and create a new delta for the same text. We should
435
# find the remaining text, and only insert the short 'and' text.
436
di.add_delta_source(second_delta, 0)
437
source += second_delta
438
third_delta = di.make_delta(_third_text)
439
result = self._gc_module.apply_delta(source, third_delta)
440
self.assertEqualDiff(_third_text, result)
441
self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
442
b'\x91S&\x03and\x91\x18,', third_delta)
443
# Now create a delta, which we know won't be able to be 'fit' into the
445
fourth_delta = di.make_delta(_fourth_text)
446
self.assertEqual(_fourth_text,
447
self._gc_module.apply_delta(source, fourth_delta))
448
self.assertEqual(b'\x80\x01'
449
b'\x7f123456789012345\nsame rabin hash\n'
450
b'123456789012345\nsame rabin hash\n'
451
b'123456789012345\nsame rabin hash\n'
452
b'123456789012345\nsame rabin hash'
453
b'\x01\n', fourth_delta)
454
di.add_delta_source(fourth_delta, 0)
455
source += fourth_delta
456
# With the next delta, everything should be found
457
fifth_delta = di.make_delta(_fourth_text)
458
self.assertEqual(_fourth_text,
459
self._gc_module.apply_delta(source, fifth_delta))
460
self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
463
class TestCopyInstruction(tests.TestCase):
465
def assertEncode(self, expected, offset, length):
466
data = _groupcompress_py.encode_copy_instruction(offset, length)
467
self.assertEqual(expected, data)
469
def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
472
out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
473
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
475
def test_encode_no_length(self):
476
self.assertEncode(b'\x80', 0, 64 * 1024)
477
self.assertEncode(b'\x81\x01', 1, 64 * 1024)
478
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
479
self.assertEncode(b'\x81\xff', 255, 64 * 1024)
480
self.assertEncode(b'\x82\x01', 256, 64 * 1024)
481
self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
482
self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
483
self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
484
self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
485
self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
486
self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
487
self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
489
def test_encode_no_offset(self):
490
self.assertEncode(b'\x90\x01', 0, 1)
491
self.assertEncode(b'\x90\x0a', 0, 10)
492
self.assertEncode(b'\x90\xff', 0, 255)
493
self.assertEncode(b'\xA0\x01', 0, 256)
494
self.assertEncode(b'\xB0\x01\x01', 0, 257)
495
self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
496
# Special case, if copy == 64KiB, then we store exactly 0
497
# Note that this puns with a copy of exactly 0 bytes, but we don't care
498
# about that, as we would never actually copy 0 bytes
499
self.assertEncode(b'\x80', 0, 64 * 1024)
501
def test_encode(self):
502
self.assertEncode(b'\x91\x01\x01', 1, 1)
503
self.assertEncode(b'\x91\x09\x0a', 9, 10)
504
self.assertEncode(b'\x91\xfe\xff', 254, 255)
505
self.assertEncode(b'\xA2\x02\x01', 512, 256)
506
self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
507
self.assertEncode(b'\xB0\x01\x01', 0, 257)
508
# Special case, if copy == 64KiB, then we store exactly 0
509
# Note that this puns with a copy of exactly 0 bytes, but we don't care
510
# about that, as we would never actually copy 0 bytes
511
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
513
def test_decode_no_length(self):
514
# If length is 0, it is interpreted as 64KiB
515
# The shortest possible instruction is a copy of 64KiB from offset 0
516
self.assertDecode(0, 65536, 1, b'\x80', 0)
517
self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
518
self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
519
self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
520
self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
521
self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
522
self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
523
self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
524
self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
525
self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
526
self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
527
self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
529
def test_decode_no_offset(self):
530
self.assertDecode(0, 1, 2, b'\x90\x01', 0)
531
self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
532
self.assertDecode(0, 255, 2, b'\x90\xff', 0)
533
self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
534
self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
535
self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
536
# Special case, if copy == 64KiB, then we store exactly 0
537
# Note that this puns with a copy of exactly 0 bytes, but we don't care
538
# about that, as we would never actually copy 0 bytes
539
self.assertDecode(0, 65536, 1, b'\x80', 0)
541
def test_decode(self):
542
self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
543
self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
544
self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
545
self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
546
self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
547
self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
549
def test_decode_not_start(self):
550
self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
551
self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
552
self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
555
class TestBase128Int(tests.TestCase):
557
scenarios = module_scenarios()
559
_gc_module = None # Set by load_tests
561
def assertEqualEncode(self, bytes, val):
562
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
564
def assertEqualDecode(self, val, num_decode, bytes):
565
self.assertEqual((val, num_decode),
566
self._gc_module.decode_base128_int(bytes))
568
def test_encode(self):
569
self.assertEqualEncode(b'\x01', 1)
570
self.assertEqualEncode(b'\x02', 2)
571
self.assertEqualEncode(b'\x7f', 127)
572
self.assertEqualEncode(b'\x80\x01', 128)
573
self.assertEqualEncode(b'\xff\x01', 255)
574
self.assertEqualEncode(b'\x80\x02', 256)
575
self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
577
def test_decode(self):
578
self.assertEqualDecode(1, 1, b'\x01')
579
self.assertEqualDecode(2, 1, b'\x02')
580
self.assertEqualDecode(127, 1, b'\x7f')
581
self.assertEqualDecode(128, 2, b'\x80\x01')
582
self.assertEqualDecode(255, 2, b'\xff\x01')
583
self.assertEqualDecode(256, 2, b'\x80\x02')
584
self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
586
def test_decode_with_trailing_bytes(self):
587
self.assertEqualDecode(1, 1, b'\x01abcdef')
588
self.assertEqualDecode(127, 1, b'\x7f\x01')
589
self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
590
self.assertEqualDecode(255, 2, b'\xff\x01\xff')