140
125
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
142
127
def test_make_delta_is_typesafe(self):
143
self.make_delta(b'a string', b'another string')
128
self.make_delta('a string', 'another string')
145
130
def _check_make_delta(string1, string2):
146
131
self.assertRaises(TypeError, self.make_delta, string1, string2)
148
_check_make_delta(b'a string', object())
149
_check_make_delta(b'a string', u'not a string')
150
_check_make_delta(object(), b'a string')
151
_check_make_delta(u'not a string', b'a string')
133
_check_make_delta('a string', object())
134
_check_make_delta('a string', u'not a string')
135
_check_make_delta(object(), 'a string')
136
_check_make_delta(u'not a string', 'a string')
153
138
def test_make_noop_delta(self):
154
139
ident_delta = self.make_delta(_text1, _text1)
155
self.assertEqual(b'M\x90M', ident_delta)
140
self.assertEqual('M\x90M', ident_delta)
156
141
ident_delta = self.make_delta(_text2, _text2)
157
self.assertEqual(b'N\x90N', ident_delta)
142
self.assertEqual('N\x90N', ident_delta)
158
143
ident_delta = self.make_delta(_text3, _text3)
159
self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
144
self.assertEqual('\x87\x01\x90\x87', ident_delta)
161
146
def assertDeltaIn(self, delta1, delta2, delta):
162
147
"""Make sure that the delta bytes match one of the expectations."""
163
148
# In general, the python delta matcher gives different results than the
164
149
# pyrex delta matcher. Both should be valid deltas, though.
165
150
if delta not in (delta1, delta2):
166
self.fail(b"Delta bytes:\n"
151
self.fail("Delta bytes:\n"
170
155
% (delta, delta1, delta2))
172
157
def test_make_delta(self):
173
158
delta = self.make_delta(_text1, _text2)
174
159
self.assertDeltaIn(
175
b'N\x90/\x1fdiffer from\nagainst other text\n',
176
b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
160
'N\x90/\x1fdiffer from\nagainst other text\n',
161
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
178
163
delta = self.make_delta(_text2, _text1)
179
164
self.assertDeltaIn(
180
b'M\x90/\x1ebe matched\nagainst other text\n',
181
b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
165
'M\x90/\x1ebe matched\nagainst other text\n',
166
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
183
168
delta = self.make_delta(_text3, _text1)
184
self.assertEqual(b'M\x90M', delta)
169
self.assertEqual('M\x90M', delta)
185
170
delta = self.make_delta(_text3, _text2)
186
171
self.assertDeltaIn(
187
b'N\x90/\x1fdiffer from\nagainst other text\n',
188
b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
172
'N\x90/\x1fdiffer from\nagainst other text\n',
173
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
191
176
def test_make_delta_with_large_copies(self):
194
179
big_text = _text3 * 1220
195
180
delta = self.make_delta(big_text, big_text)
196
181
self.assertDeltaIn(
197
b'\xdc\x86\x0a' # Encoding the length of the uncompressed text
198
b'\x80' # Copy 64kB, starting at byte 0
199
b'\x84\x01' # and another 64kB starting at 64kB
200
b'\xb4\x02\x5c\x83', # And the bit of tail.
182
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
183
'\x80' # Copy 64kB, starting at byte 0
184
'\x84\x01' # and another 64kB starting at 64kB
185
'\xb4\x02\x5c\x83', # And the bit of tail.
201
186
None, # Both implementations should be identical
204
189
def test_apply_delta_is_typesafe(self):
205
self.apply_delta(_text1, b'M\x90M')
206
self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
190
self.apply_delta(_text1, 'M\x90M')
191
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
207
192
self.assertRaises(TypeError, self.apply_delta,
208
_text1.decode('latin1'), b'M\x90M')
193
unicode(_text1), 'M\x90M')
209
194
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
210
195
self.assertRaises(TypeError, self.apply_delta, _text1, object())
212
197
def test_apply_delta(self):
213
198
target = self.apply_delta(_text1,
214
b'N\x90/\x1fdiffer from\nagainst other text\n')
199
'N\x90/\x1fdiffer from\nagainst other text\n')
215
200
self.assertEqual(_text2, target)
216
201
target = self.apply_delta(_text2,
217
b'M\x90/\x1ebe matched\nagainst other text\n')
202
'M\x90/\x1ebe matched\nagainst other text\n')
218
203
self.assertEqual(_text1, target)
220
205
def test_apply_delta_to_source_is_safe(self):
221
206
self.assertRaises(TypeError,
222
self.apply_delta_to_source, object(), 0, 1)
207
self.apply_delta_to_source, object(), 0, 1)
223
208
self.assertRaises(TypeError,
224
self.apply_delta_to_source, u'unicode str', 0, 1)
209
self.apply_delta_to_source, u'unicode str', 0, 1)
226
211
self.assertRaises(ValueError,
227
self.apply_delta_to_source, b'foo', 1, 4)
212
self.apply_delta_to_source, 'foo', 1, 4)
229
214
self.assertRaises(ValueError,
230
self.apply_delta_to_source, b'foo', 5, 3)
215
self.apply_delta_to_source, 'foo', 5, 3)
232
217
self.assertRaises(ValueError,
233
self.apply_delta_to_source, b'foo', 3, 2)
218
self.apply_delta_to_source, 'foo', 3, 2)
235
220
def test_apply_delta_to_source(self):
236
221
source_and_delta = (_text1
237
+ b'N\x90/\x1fdiffer from\nagainst other text\n')
222
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
238
223
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
239
len(_text1), len(source_and_delta)))
224
len(_text1), len(source_and_delta)))
242
227
class TestMakeAndApplyCompatible(tests.TestCase):
244
scenarios = two_way_scenarios()
246
make_delta = None # Set by load_tests
247
apply_delta = None # Set by load_tests
229
make_delta = None # Set by load_tests
230
apply_delta = None # Set by load_tests
249
232
def assertMakeAndApply(self, source, target):
250
233
"""Assert that generating a delta and applying gives success."""
272
255
self._gc_module = compiled_groupcompress_feature.module
274
257
def test_repr(self):
275
di = self._gc_module.DeltaIndex(b'test text\n')
258
di = self._gc_module.DeltaIndex('test text\n')
276
259
self.assertEqual('DeltaIndex(1, 10)', repr(di))
278
def test_sizeof(self):
279
di = self._gc_module.DeltaIndex()
280
# Exact value will depend on platform but should include sources
281
# source_info is a pointer and two longs so at least 12 bytes
282
lower_bound = di._max_num_sources * 12
283
self.assertGreater(sys.getsizeof(di), lower_bound)
285
def test__dump_no_index(self):
286
di = self._gc_module.DeltaIndex()
287
self.assertEqual(None, di._dump_index())
289
def test__dump_index_simple(self):
290
di = self._gc_module.DeltaIndex()
291
di.add_source(_text1, 0)
292
self.assertFalse(di._has_index())
293
self.assertEqual(None, di._dump_index())
294
_ = di.make_delta(_text1)
295
self.assertTrue(di._has_index())
296
hash_list, entry_list = di._dump_index()
297
self.assertEqual(16, len(hash_list))
298
self.assertEqual(68, len(entry_list))
299
just_entries = [(idx, text_offset, hash_val)
300
for idx, (text_offset, hash_val)
301
in enumerate(entry_list)
302
if text_offset != 0 or hash_val != 0]
303
rabin_hash = self._gc_module._rabin_hash
304
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
305
(25, 48, rabin_hash(_text1[33:49])),
306
(34, 32, rabin_hash(_text1[17:33])),
307
(47, 64, rabin_hash(_text1[49:65])),
309
# This ensures that the hash map points to the location we expect it to
310
for entry_idx, text_offset, hash_val in just_entries:
311
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
313
def test__dump_index_two_sources(self):
314
di = self._gc_module.DeltaIndex()
315
di.add_source(_text1, 0)
316
di.add_source(_text2, 2)
317
start2 = len(_text1) + 2
318
self.assertTrue(di._has_index())
319
hash_list, entry_list = di._dump_index()
320
self.assertEqual(16, len(hash_list))
321
self.assertEqual(68, len(entry_list))
322
just_entries = [(idx, text_offset, hash_val)
323
for idx, (text_offset, hash_val)
324
in enumerate(entry_list)
325
if text_offset != 0 or hash_val != 0]
326
rabin_hash = self._gc_module._rabin_hash
327
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
328
(9, start2 + 16, rabin_hash(_text2[1:17])),
329
(25, 48, rabin_hash(_text1[33:49])),
330
(30, start2 + 64, rabin_hash(_text2[49:65])),
331
(34, 32, rabin_hash(_text1[17:33])),
332
(35, start2 + 32, rabin_hash(_text2[17:33])),
333
(43, start2 + 48, rabin_hash(_text2[33:49])),
334
(47, 64, rabin_hash(_text1[49:65])),
336
# Each entry should be in the appropriate hash bucket.
337
for entry_idx, text_offset, hash_val in just_entries:
338
hash_idx = hash_val & 0xf
340
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
342
261
def test_first_add_source_doesnt_index_until_make_delta(self):
343
262
di = self._gc_module.DeltaIndex()
344
263
self.assertFalse(di._has_index())
441
339
third_delta = di.make_delta(_third_text)
442
340
result = self._gc_module.apply_delta(source, third_delta)
443
341
self.assertEqualDiff(_third_text, result)
444
self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
445
b'\x91S&\x03and\x91\x18,', third_delta)
342
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
343
'\x91S&\x03and\x91\x18,', third_delta)
446
344
# Now create a delta, which we know won't be able to be 'fit' into the
448
346
fourth_delta = di.make_delta(_fourth_text)
449
347
self.assertEqual(_fourth_text,
450
348
self._gc_module.apply_delta(source, fourth_delta))
451
self.assertEqual(b'\x80\x01'
452
b'\x7f123456789012345\nsame rabin hash\n'
453
b'123456789012345\nsame rabin hash\n'
454
b'123456789012345\nsame rabin hash\n'
455
b'123456789012345\nsame rabin hash'
456
b'\x01\n', fourth_delta)
349
self.assertEqual('\x80\x01'
350
'\x7f123456789012345\nsame rabin hash\n'
351
'123456789012345\nsame rabin hash\n'
352
'123456789012345\nsame rabin hash\n'
353
'123456789012345\nsame rabin hash'
354
'\x01\n', fourth_delta)
457
355
di.add_delta_source(fourth_delta, 0)
458
356
source += fourth_delta
459
357
# With the next delta, everything should be found
460
358
fifth_delta = di.make_delta(_fourth_text)
461
359
self.assertEqual(_fourth_text,
462
360
self._gc_module.apply_delta(source, fifth_delta))
463
self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
361
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
466
364
class TestCopyInstruction(tests.TestCase):
468
366
def assertEncode(self, expected, offset, length):
469
data = _groupcompress_py.encode_copy_instruction(offset, length)
470
self.assertEqual(expected, data)
367
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
368
if expected != bytes:
369
self.assertEqual([hex(ord(e)) for e in expected],
370
[hex(ord(b)) for b in bytes])
472
def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
473
cmd = indexbytes(data, pos)
372
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
373
cmd = ord(bytes[pos])
475
out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
375
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
476
376
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
478
378
def test_encode_no_length(self):
479
self.assertEncode(b'\x80', 0, 64 * 1024)
480
self.assertEncode(b'\x81\x01', 1, 64 * 1024)
481
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
482
self.assertEncode(b'\x81\xff', 255, 64 * 1024)
483
self.assertEncode(b'\x82\x01', 256, 64 * 1024)
484
self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
485
self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
486
self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
487
self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
488
self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
489
self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
490
self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
379
self.assertEncode('\x80', 0, 64*1024)
380
self.assertEncode('\x81\x01', 1, 64*1024)
381
self.assertEncode('\x81\x0a', 10, 64*1024)
382
self.assertEncode('\x81\xff', 255, 64*1024)
383
self.assertEncode('\x82\x01', 256, 64*1024)
384
self.assertEncode('\x83\x01\x01', 257, 64*1024)
385
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
386
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
387
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
388
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
389
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
390
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
492
392
def test_encode_no_offset(self):
493
self.assertEncode(b'\x90\x01', 0, 1)
494
self.assertEncode(b'\x90\x0a', 0, 10)
495
self.assertEncode(b'\x90\xff', 0, 255)
496
self.assertEncode(b'\xA0\x01', 0, 256)
497
self.assertEncode(b'\xB0\x01\x01', 0, 257)
498
self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
393
self.assertEncode('\x90\x01', 0, 1)
394
self.assertEncode('\x90\x0a', 0, 10)
395
self.assertEncode('\x90\xff', 0, 255)
396
self.assertEncode('\xA0\x01', 0, 256)
397
self.assertEncode('\xB0\x01\x01', 0, 257)
398
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
499
399
# Special case, if copy == 64KiB, then we store exactly 0
500
400
# Note that this puns with a copy of exactly 0 bytes, but we don't care
501
401
# about that, as we would never actually copy 0 bytes
502
self.assertEncode(b'\x80', 0, 64 * 1024)
402
self.assertEncode('\x80', 0, 64*1024)
504
404
def test_encode(self):
505
self.assertEncode(b'\x91\x01\x01', 1, 1)
506
self.assertEncode(b'\x91\x09\x0a', 9, 10)
507
self.assertEncode(b'\x91\xfe\xff', 254, 255)
508
self.assertEncode(b'\xA2\x02\x01', 512, 256)
509
self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
510
self.assertEncode(b'\xB0\x01\x01', 0, 257)
405
self.assertEncode('\x91\x01\x01', 1, 1)
406
self.assertEncode('\x91\x09\x0a', 9, 10)
407
self.assertEncode('\x91\xfe\xff', 254, 255)
408
self.assertEncode('\xA2\x02\x01', 512, 256)
409
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
410
self.assertEncode('\xB0\x01\x01', 0, 257)
511
411
# Special case, if copy == 64KiB, then we store exactly 0
512
412
# Note that this puns with a copy of exactly 0 bytes, but we don't care
513
413
# about that, as we would never actually copy 0 bytes
514
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
414
self.assertEncode('\x81\x0a', 10, 64*1024)
516
416
def test_decode_no_length(self):
517
417
# If length is 0, it is interpreted as 64KiB
518
418
# The shortest possible instruction is a copy of 64KiB from offset 0
519
self.assertDecode(0, 65536, 1, b'\x80', 0)
520
self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
521
self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
522
self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
523
self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
524
self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
525
self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
526
self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
527
self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
528
self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
529
self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
530
self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
419
self.assertDecode(0, 65536, 1, '\x80', 0)
420
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
421
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
422
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
423
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
424
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
425
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
426
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
427
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
428
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
429
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
430
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
532
432
def test_decode_no_offset(self):
533
self.assertDecode(0, 1, 2, b'\x90\x01', 0)
534
self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
535
self.assertDecode(0, 255, 2, b'\x90\xff', 0)
536
self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
537
self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
538
self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
433
self.assertDecode(0, 1, 2, '\x90\x01', 0)
434
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
435
self.assertDecode(0, 255, 2, '\x90\xff', 0)
436
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
437
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
438
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
539
439
# Special case, if copy == 64KiB, then we store exactly 0
540
440
# Note that this puns with a copy of exactly 0 bytes, but we don't care
541
441
# about that, as we would never actually copy 0 bytes
542
self.assertDecode(0, 65536, 1, b'\x80', 0)
442
self.assertDecode(0, 65536, 1, '\x80', 0)
544
444
def test_decode(self):
545
self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
546
self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
547
self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
548
self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
549
self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
550
self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
445
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
446
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
447
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
448
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
449
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
450
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
552
452
def test_decode_not_start(self):
553
self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
554
self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
555
self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
453
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
454
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
455
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
558
458
class TestBase128Int(tests.TestCase):
560
scenarios = module_scenarios()
562
_gc_module = None # Set by load_tests
460
_gc_module = None # Set by load_tests
564
462
def assertEqualEncode(self, bytes, val):
565
463
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))