209
210
def test_apply_delta(self):
210
211
target = self.apply_delta(_text1,
211
b'N\x90/\x1fdiffer from\nagainst other text\n')
212
b'N\x90/\x1fdiffer from\nagainst other text\n')
212
213
self.assertEqual(_text2, target)
213
214
target = self.apply_delta(_text2,
214
b'M\x90/\x1ebe matched\nagainst other text\n')
215
b'M\x90/\x1ebe matched\nagainst other text\n')
215
216
self.assertEqual(_text1, target)
217
218
def test_apply_delta_to_source_is_safe(self):
218
219
self.assertRaises(TypeError,
219
self.apply_delta_to_source, object(), 0, 1)
220
self.apply_delta_to_source, object(), 0, 1)
220
221
self.assertRaises(TypeError,
221
self.apply_delta_to_source, u'unicode str', 0, 1)
222
self.apply_delta_to_source, u'unicode str', 0, 1)
223
224
self.assertRaises(ValueError,
224
self.apply_delta_to_source, b'foo', 1, 4)
225
self.apply_delta_to_source, b'foo', 1, 4)
226
227
self.assertRaises(ValueError,
227
self.apply_delta_to_source, b'foo', 5, 3)
228
self.apply_delta_to_source, b'foo', 5, 3)
229
230
self.assertRaises(ValueError,
230
self.apply_delta_to_source, b'foo', 3, 2)
231
self.apply_delta_to_source, b'foo', 3, 2)
232
233
def test_apply_delta_to_source(self):
233
234
source_and_delta = (_text1
234
235
+ b'N\x90/\x1fdiffer from\nagainst other text\n')
235
236
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
236
len(_text1), len(source_and_delta)))
237
len(_text1), len(source_and_delta)))
239
240
class TestMakeAndApplyCompatible(tests.TestCase):
241
242
scenarios = two_way_scenarios()
243
make_delta = None # Set by load_tests
244
apply_delta = None # Set by load_tests
244
make_delta = None # Set by load_tests
245
apply_delta = None # Set by load_tests
246
247
def assertMakeAndApply(self, source, target):
247
248
"""Assert that generating a delta and applying gives success."""
269
270
self._gc_module = compiled_groupcompress_feature.module
271
272
def test_repr(self):
272
di = self._gc_module.DeltaIndex(b'test text\n')
273
di = self._gc_module.DeltaIndex('test text\n')
273
274
self.assertEqual('DeltaIndex(1, 10)', repr(di))
275
def test_sizeof(self):
276
di = self._gc_module.DeltaIndex()
277
# Exact value will depend on platform but should include sources
278
# source_info is a pointer and two longs so at least 12 bytes
279
lower_bound = di._max_num_sources * 12
280
self.assertGreater(sys.getsizeof(di), lower_bound)
282
276
def test__dump_no_index(self):
283
277
di = self._gc_module.DeltaIndex()
284
278
self.assertEqual(None, di._dump_index())
295
289
self.assertEqual(68, len(entry_list))
296
290
just_entries = [(idx, text_offset, hash_val)
297
291
for idx, (text_offset, hash_val)
298
in enumerate(entry_list)
299
if text_offset != 0 or hash_val != 0]
292
in enumerate(entry_list)
293
if text_offset != 0 or hash_val != 0]
300
294
rabin_hash = self._gc_module._rabin_hash
301
295
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
302
296
(25, 48, rabin_hash(_text1[33:49])),
303
297
(34, 32, rabin_hash(_text1[17:33])),
304
298
(47, 64, rabin_hash(_text1[49:65])),
306
300
# This ensures that the hash map points to the location we expect it to
307
301
for entry_idx, text_offset, hash_val in just_entries:
308
302
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
318
312
self.assertEqual(68, len(entry_list))
319
313
just_entries = [(idx, text_offset, hash_val)
320
314
for idx, (text_offset, hash_val)
321
in enumerate(entry_list)
322
if text_offset != 0 or hash_val != 0]
315
in enumerate(entry_list)
316
if text_offset != 0 or hash_val != 0]
323
317
rabin_hash = self._gc_module._rabin_hash
324
318
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
325
(9, start2 + 16, rabin_hash(_text2[1:17])),
319
(9, start2+16, rabin_hash(_text2[1:17])),
326
320
(25, 48, rabin_hash(_text1[33:49])),
327
(30, start2 + 64, rabin_hash(_text2[49:65])),
321
(30, start2+64, rabin_hash(_text2[49:65])),
328
322
(34, 32, rabin_hash(_text1[17:33])),
329
(35, start2 + 32, rabin_hash(_text2[17:33])),
330
(43, start2 + 48, rabin_hash(_text2[33:49])),
323
(35, start2+32, rabin_hash(_text2[17:33])),
324
(43, start2+48, rabin_hash(_text2[33:49])),
331
325
(47, 64, rabin_hash(_text1[49:65])),
333
327
# Each entry should be in the appropriate hash bucket.
334
328
for entry_idx, text_offset, hash_val in just_entries:
335
329
hash_idx = hash_val & 0xf
337
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
331
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
339
333
def test_first_add_source_doesnt_index_until_make_delta(self):
340
334
di = self._gc_module.DeltaIndex()
345
339
# generated, and will generate a proper delta
346
340
delta = di.make_delta(_text2)
347
341
self.assertTrue(di._has_index())
348
self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
342
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
350
344
def test_add_source_max_bytes_to_index(self):
351
345
di = self._gc_module.DeltaIndex()
352
di._max_bytes_to_index = 3 * 16
353
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
354
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
346
di._max_bytes_to_index = 3*16
347
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
348
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
355
349
start2 = len(_text1) + 3
356
350
hash_list, entry_list = di._dump_index()
357
351
self.assertEqual(16, len(hash_list))
358
352
self.assertEqual(67, len(entry_list))
359
353
just_entries = sorted([(text_offset, hash_val)
360
354
for text_offset, hash_val in entry_list
361
if text_offset != 0 or hash_val != 0])
355
if text_offset != 0 or hash_val != 0])
362
356
rabin_hash = self._gc_module._rabin_hash
363
357
self.assertEqual([(25, rabin_hash(_text1[10:26])),
364
358
(50, rabin_hash(_text1[35:51])),
365
359
(75, rabin_hash(_text1[60:76])),
366
(start2 + 44, rabin_hash(_text3[29:45])),
367
(start2 + 88, rabin_hash(_text3[73:89])),
368
(start2 + 132, rabin_hash(_text3[117:133])),
360
(start2+44, rabin_hash(_text3[29:45])),
361
(start2+88, rabin_hash(_text3[73:89])),
362
(start2+132, rabin_hash(_text3[117:133])),
371
365
def test_second_add_source_triggers_make_index(self):
372
366
di = self._gc_module.DeltaIndex()
391
385
delta = di.make_delta(_third_text)
392
386
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
393
387
self.assertEqualDiff(_third_text, result)
394
self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
395
b'\x91v6\x03and\x91d"\x91:\n', delta)
388
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
389
'\x91v6\x03and\x91d"\x91:\n', delta)
397
391
def test_delta_with_offsets(self):
398
392
di = self._gc_module.DeltaIndex()
404
398
delta = di.make_delta(_third_text)
405
399
self.assertIsNot(None, delta)
406
400
result = self._gc_module.apply_delta(
407
b'12345' + _first_text + b'1234567890' + _second_text, delta)
401
'12345' + _first_text + '1234567890' + _second_text, delta)
408
402
self.assertIsNot(None, result)
409
403
self.assertEqualDiff(_third_text, result)
410
self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
411
b'\x91\x856\x03and\x91s"\x91?\n', delta)
404
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
405
'\x91\x856\x03and\x91s"\x91?\n', delta)
413
407
def test_delta_with_delta_bytes(self):
414
408
di = self._gc_module.DeltaIndex()
416
410
di.add_source(_first_text, 0)
417
411
self.assertEqual(len(_first_text), di._source_offset)
418
412
delta = di.make_delta(_second_text)
419
self.assertEqual(b'h\tsome more\x91\x019'
420
b'&previous text\nand has some extra text\n', delta)
413
self.assertEqual('h\tsome more\x91\x019'
414
'&previous text\nand has some extra text\n', delta)
421
415
di.add_delta_source(delta, 0)
423
417
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
429
423
# Note that we don't match the 'common with the', because it isn't long
430
424
# enough to match in the original text, and those bytes are not present
431
425
# in the delta for the second text.
432
self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
433
b'\x91S&\x03and\x91\x18,', second_delta)
426
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
427
'\x91S&\x03and\x91\x18,', second_delta)
434
428
# Add this delta, and create a new delta for the same text. We should
435
429
# find the remaining text, and only insert the short 'and' text.
436
430
di.add_delta_source(second_delta, 0)
438
432
third_delta = di.make_delta(_third_text)
439
433
result = self._gc_module.apply_delta(source, third_delta)
440
434
self.assertEqualDiff(_third_text, result)
441
self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
442
b'\x91S&\x03and\x91\x18,', third_delta)
435
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
436
'\x91S&\x03and\x91\x18,', third_delta)
443
437
# Now create a delta, which we know won't be able to be 'fit' into the
445
439
fourth_delta = di.make_delta(_fourth_text)
446
440
self.assertEqual(_fourth_text,
447
441
self._gc_module.apply_delta(source, fourth_delta))
448
self.assertEqual(b'\x80\x01'
449
b'\x7f123456789012345\nsame rabin hash\n'
450
b'123456789012345\nsame rabin hash\n'
451
b'123456789012345\nsame rabin hash\n'
452
b'123456789012345\nsame rabin hash'
453
b'\x01\n', fourth_delta)
442
self.assertEqual('\x80\x01'
443
'\x7f123456789012345\nsame rabin hash\n'
444
'123456789012345\nsame rabin hash\n'
445
'123456789012345\nsame rabin hash\n'
446
'123456789012345\nsame rabin hash'
447
'\x01\n', fourth_delta)
454
448
di.add_delta_source(fourth_delta, 0)
455
449
source += fourth_delta
456
450
# With the next delta, everything should be found
457
451
fifth_delta = di.make_delta(_fourth_text)
458
452
self.assertEqual(_fourth_text,
459
453
self._gc_module.apply_delta(source, fifth_delta))
460
self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
454
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
463
457
class TestCopyInstruction(tests.TestCase):
467
461
self.assertEqual(expected, data)
469
463
def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
464
cmd = indexbytes(data, pos)
472
466
out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
473
467
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
475
469
def test_encode_no_length(self):
476
self.assertEncode(b'\x80', 0, 64 * 1024)
477
self.assertEncode(b'\x81\x01', 1, 64 * 1024)
478
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
479
self.assertEncode(b'\x81\xff', 255, 64 * 1024)
480
self.assertEncode(b'\x82\x01', 256, 64 * 1024)
481
self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
482
self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
483
self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
484
self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
485
self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
486
self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
487
self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
470
self.assertEncode(b'\x80', 0, 64*1024)
471
self.assertEncode(b'\x81\x01', 1, 64*1024)
472
self.assertEncode(b'\x81\x0a', 10, 64*1024)
473
self.assertEncode(b'\x81\xff', 255, 64*1024)
474
self.assertEncode(b'\x82\x01', 256, 64*1024)
475
self.assertEncode(b'\x83\x01\x01', 257, 64*1024)
476
self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
477
self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
478
self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
479
self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
480
self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
481
self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
489
483
def test_encode_no_offset(self):
490
484
self.assertEncode(b'\x90\x01', 0, 1)
496
490
# Special case, if copy == 64KiB, then we store exactly 0
497
491
# Note that this puns with a copy of exactly 0 bytes, but we don't care
498
492
# about that, as we would never actually copy 0 bytes
499
self.assertEncode(b'\x80', 0, 64 * 1024)
493
self.assertEncode(b'\x80', 0, 64*1024)
501
495
def test_encode(self):
502
496
self.assertEncode(b'\x91\x01\x01', 1, 1)
508
502
# Special case, if copy == 64KiB, then we store exactly 0
509
503
# Note that this puns with a copy of exactly 0 bytes, but we don't care
510
504
# about that, as we would never actually copy 0 bytes
511
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
505
self.assertEncode(b'\x81\x0a', 10, 64*1024)
513
507
def test_decode_no_length(self):
514
508
# If length is 0, it is interpreted as 64KiB