210
212
def test_apply_delta(self):
211
213
target = self.apply_delta(_text1,
212
b'N\x90/\x1fdiffer from\nagainst other text\n')
214
b'N\x90/\x1fdiffer from\nagainst other text\n')
213
215
self.assertEqual(_text2, target)
214
216
target = self.apply_delta(_text2,
215
b'M\x90/\x1ebe matched\nagainst other text\n')
217
b'M\x90/\x1ebe matched\nagainst other text\n')
216
218
self.assertEqual(_text1, target)
218
220
def test_apply_delta_to_source_is_safe(self):
219
221
self.assertRaises(TypeError,
220
self.apply_delta_to_source, object(), 0, 1)
222
self.apply_delta_to_source, object(), 0, 1)
221
223
self.assertRaises(TypeError,
222
self.apply_delta_to_source, u'unicode str', 0, 1)
224
self.apply_delta_to_source, u'unicode str', 0, 1)
224
226
self.assertRaises(ValueError,
225
self.apply_delta_to_source, b'foo', 1, 4)
227
self.apply_delta_to_source, b'foo', 1, 4)
227
229
self.assertRaises(ValueError,
228
self.apply_delta_to_source, b'foo', 5, 3)
230
self.apply_delta_to_source, b'foo', 5, 3)
230
232
self.assertRaises(ValueError,
231
self.apply_delta_to_source, b'foo', 3, 2)
233
self.apply_delta_to_source, b'foo', 3, 2)
233
235
def test_apply_delta_to_source(self):
234
236
source_and_delta = (_text1
235
237
+ b'N\x90/\x1fdiffer from\nagainst other text\n')
236
238
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
237
len(_text1), len(source_and_delta)))
239
len(_text1), len(source_and_delta)))
240
242
class TestMakeAndApplyCompatible(tests.TestCase):
242
244
scenarios = two_way_scenarios()
244
make_delta = None # Set by load_tests
245
apply_delta = None # Set by load_tests
246
make_delta = None # Set by load_tests
247
apply_delta = None # Set by load_tests
247
249
def assertMakeAndApply(self, source, target):
248
250
"""Assert that generating a delta and applying gives success."""
273
275
di = self._gc_module.DeltaIndex(b'test text\n')
274
276
self.assertEqual('DeltaIndex(1, 10)', repr(di))
278
def test_sizeof(self):
279
di = self._gc_module.DeltaIndex()
280
# Exact value will depend on platform but should include sources
281
# source_info is a pointer and two longs so at least 12 bytes
282
lower_bound = di._max_num_sources * 12
283
self.assertGreater(sys.getsizeof(di), lower_bound)
276
285
def test__dump_no_index(self):
277
286
di = self._gc_module.DeltaIndex()
278
287
self.assertEqual(None, di._dump_index())
289
298
self.assertEqual(68, len(entry_list))
290
299
just_entries = [(idx, text_offset, hash_val)
291
300
for idx, (text_offset, hash_val)
292
in enumerate(entry_list)
293
if text_offset != 0 or hash_val != 0]
301
in enumerate(entry_list)
302
if text_offset != 0 or hash_val != 0]
294
303
rabin_hash = self._gc_module._rabin_hash
295
304
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
296
305
(25, 48, rabin_hash(_text1[33:49])),
297
306
(34, 32, rabin_hash(_text1[17:33])),
298
307
(47, 64, rabin_hash(_text1[49:65])),
300
309
# This ensures that the hash map points to the location we expect it to
301
310
for entry_idx, text_offset, hash_val in just_entries:
302
311
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
312
321
self.assertEqual(68, len(entry_list))
313
322
just_entries = [(idx, text_offset, hash_val)
314
323
for idx, (text_offset, hash_val)
315
in enumerate(entry_list)
316
if text_offset != 0 or hash_val != 0]
324
in enumerate(entry_list)
325
if text_offset != 0 or hash_val != 0]
317
326
rabin_hash = self._gc_module._rabin_hash
318
327
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
319
(9, start2+16, rabin_hash(_text2[1:17])),
328
(9, start2 + 16, rabin_hash(_text2[1:17])),
320
329
(25, 48, rabin_hash(_text1[33:49])),
321
(30, start2+64, rabin_hash(_text2[49:65])),
330
(30, start2 + 64, rabin_hash(_text2[49:65])),
322
331
(34, 32, rabin_hash(_text1[17:33])),
323
(35, start2+32, rabin_hash(_text2[17:33])),
324
(43, start2+48, rabin_hash(_text2[33:49])),
332
(35, start2 + 32, rabin_hash(_text2[17:33])),
333
(43, start2 + 48, rabin_hash(_text2[33:49])),
325
334
(47, 64, rabin_hash(_text1[49:65])),
327
336
# Each entry should be in the appropriate hash bucket.
328
337
for entry_idx, text_offset, hash_val in just_entries:
329
338
hash_idx = hash_val & 0xf
331
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
340
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
333
342
def test_first_add_source_doesnt_index_until_make_delta(self):
334
343
di = self._gc_module.DeltaIndex()
344
353
def test_add_source_max_bytes_to_index(self):
345
354
di = self._gc_module.DeltaIndex()
346
di._max_bytes_to_index = 3*16
347
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
348
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
355
di._max_bytes_to_index = 3 * 16
356
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
357
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
349
358
start2 = len(_text1) + 3
350
359
hash_list, entry_list = di._dump_index()
351
360
self.assertEqual(16, len(hash_list))
352
361
self.assertEqual(67, len(entry_list))
353
362
just_entries = sorted([(text_offset, hash_val)
354
363
for text_offset, hash_val in entry_list
355
if text_offset != 0 or hash_val != 0])
364
if text_offset != 0 or hash_val != 0])
356
365
rabin_hash = self._gc_module._rabin_hash
357
366
self.assertEqual([(25, rabin_hash(_text1[10:26])),
358
367
(50, rabin_hash(_text1[35:51])),
359
368
(75, rabin_hash(_text1[60:76])),
360
(start2+44, rabin_hash(_text3[29:45])),
361
(start2+88, rabin_hash(_text3[73:89])),
362
(start2+132, rabin_hash(_text3[117:133])),
369
(start2 + 44, rabin_hash(_text3[29:45])),
370
(start2 + 88, rabin_hash(_text3[73:89])),
371
(start2 + 132, rabin_hash(_text3[117:133])),
365
374
def test_second_add_source_triggers_make_index(self):
366
375
di = self._gc_module.DeltaIndex()
467
476
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
469
478
def test_encode_no_length(self):
470
self.assertEncode(b'\x80', 0, 64*1024)
471
self.assertEncode(b'\x81\x01', 1, 64*1024)
472
self.assertEncode(b'\x81\x0a', 10, 64*1024)
473
self.assertEncode(b'\x81\xff', 255, 64*1024)
474
self.assertEncode(b'\x82\x01', 256, 64*1024)
475
self.assertEncode(b'\x83\x01\x01', 257, 64*1024)
476
self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
477
self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
478
self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
479
self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
480
self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
481
self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
479
self.assertEncode(b'\x80', 0, 64 * 1024)
480
self.assertEncode(b'\x81\x01', 1, 64 * 1024)
481
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
482
self.assertEncode(b'\x81\xff', 255, 64 * 1024)
483
self.assertEncode(b'\x82\x01', 256, 64 * 1024)
484
self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
485
self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
486
self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
487
self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
488
self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
489
self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
490
self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
483
492
def test_encode_no_offset(self):
484
493
self.assertEncode(b'\x90\x01', 0, 1)
490
499
# Special case, if copy == 64KiB, then we store exactly 0
491
500
# Note that this puns with a copy of exactly 0 bytes, but we don't care
492
501
# about that, as we would never actually copy 0 bytes
493
self.assertEncode(b'\x80', 0, 64*1024)
502
self.assertEncode(b'\x80', 0, 64 * 1024)
495
504
def test_encode(self):
496
505
self.assertEncode(b'\x91\x01\x01', 1, 1)
502
511
# Special case, if copy == 64KiB, then we store exactly 0
503
512
# Note that this puns with a copy of exactly 0 bytes, but we don't care
504
513
# about that, as we would never actually copy 0 bytes
505
self.assertEncode(b'\x81\x0a', 10, 64*1024)
514
self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
507
516
def test_decode_no_length(self):
508
517
# If length is 0, it is interpreted as 64KiB