/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2018-02-18 21:42:57 UTC
  • mto: This revision was merged to the branch mainline in revision 6859.
  • Revision ID: jelmer@jelmer.uk-20180218214257-jpevutp1wa30tz3v
Update TODO to reference Breezy, not Bazaar.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
 
import sys
20
 
 
21
 
from ... import (
 
19
from .. import (
22
20
    tests,
23
21
    )
24
 
from .. import (
 
22
from ..bzr import (
25
23
    _groupcompress_py,
26
24
    )
27
 
from ...tests.scenarios import (
 
25
from .scenarios import (
28
26
    load_tests_apply_scenarios,
29
27
    )
30
 
from ...tests import (
 
28
from ..sixish import (
 
29
    indexbytes,
 
30
    )
 
31
from . import (
31
32
    features,
32
33
    )
33
34
 
39
40
    if compiled_groupcompress_feature.available():
40
41
        gc_module = compiled_groupcompress_feature.module
41
42
        scenarios.append(('C',
42
 
                          {'_gc_module': gc_module}))
 
43
            {'_gc_module': gc_module}))
43
44
    return scenarios
44
45
 
45
46
 
128
129
class TestMakeAndApplyDelta(tests.TestCase):
129
130
 
130
131
    scenarios = module_scenarios()
131
 
    _gc_module = None  # Set by load_tests
 
132
    _gc_module = None # Set by load_tests
132
133
 
133
134
    def setUp(self):
134
135
        super(TestMakeAndApplyDelta, self).setUp()
194
195
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
195
196
            b'\x80'              # Copy 64kB, starting at byte 0
196
197
            b'\x84\x01'          # and another 64kB starting at 64kB
197
 
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
 
198
            b'\xb4\x02\x5c\x83', # And the bit of tail.
198
199
            None,   # Both implementations should be identical
199
200
            delta)
200
201
 
208
209
 
209
210
    def test_apply_delta(self):
210
211
        target = self.apply_delta(_text1,
211
 
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
 
212
                    b'N\x90/\x1fdiffer from\nagainst other text\n')
212
213
        self.assertEqual(_text2, target)
213
214
        target = self.apply_delta(_text2,
214
 
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
 
215
                    b'M\x90/\x1ebe matched\nagainst other text\n')
215
216
        self.assertEqual(_text1, target)
216
217
 
217
218
    def test_apply_delta_to_source_is_safe(self):
218
219
        self.assertRaises(TypeError,
219
 
                          self.apply_delta_to_source, object(), 0, 1)
 
220
            self.apply_delta_to_source, object(), 0, 1)
220
221
        self.assertRaises(TypeError,
221
 
                          self.apply_delta_to_source, u'unicode str', 0, 1)
 
222
            self.apply_delta_to_source, u'unicode str', 0, 1)
222
223
        # end > length
223
224
        self.assertRaises(ValueError,
224
 
                          self.apply_delta_to_source, b'foo', 1, 4)
 
225
            self.apply_delta_to_source, b'foo', 1, 4)
225
226
        # start > length
226
227
        self.assertRaises(ValueError,
227
 
                          self.apply_delta_to_source, b'foo', 5, 3)
 
228
            self.apply_delta_to_source, b'foo', 5, 3)
228
229
        # start > end
229
230
        self.assertRaises(ValueError,
230
 
                          self.apply_delta_to_source, b'foo', 3, 2)
 
231
            self.apply_delta_to_source, b'foo', 3, 2)
231
232
 
232
233
    def test_apply_delta_to_source(self):
233
234
        source_and_delta = (_text1
234
235
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
235
236
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
236
 
                                                            len(_text1), len(source_and_delta)))
 
237
                                    len(_text1), len(source_and_delta)))
237
238
 
238
239
 
239
240
class TestMakeAndApplyCompatible(tests.TestCase):
240
241
 
241
242
    scenarios = two_way_scenarios()
242
243
 
243
 
    make_delta = None  # Set by load_tests
244
 
    apply_delta = None  # Set by load_tests
 
244
    make_delta = None # Set by load_tests
 
245
    apply_delta = None # Set by load_tests
245
246
 
246
247
    def assertMakeAndApply(self, source, target):
247
248
        """Assert that generating a delta and applying gives success."""
269
270
        self._gc_module = compiled_groupcompress_feature.module
270
271
 
271
272
    def test_repr(self):
272
 
        di = self._gc_module.DeltaIndex(b'test text\n')
 
273
        di = self._gc_module.DeltaIndex('test text\n')
273
274
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
274
275
 
275
 
    def test_sizeof(self):
276
 
        di = self._gc_module.DeltaIndex()
277
 
        # Exact value will depend on platform but should include sources
278
 
        # source_info is a pointer and two longs so at least 12 bytes
279
 
        lower_bound = di._max_num_sources * 12
280
 
        self.assertGreater(sys.getsizeof(di), lower_bound)
281
 
 
282
276
    def test__dump_no_index(self):
283
277
        di = self._gc_module.DeltaIndex()
284
278
        self.assertEqual(None, di._dump_index())
295
289
        self.assertEqual(68, len(entry_list))
296
290
        just_entries = [(idx, text_offset, hash_val)
297
291
                        for idx, (text_offset, hash_val)
298
 
                        in enumerate(entry_list)
299
 
                        if text_offset != 0 or hash_val != 0]
 
292
                         in enumerate(entry_list)
 
293
                         if text_offset != 0 or hash_val != 0]
300
294
        rabin_hash = self._gc_module._rabin_hash
301
295
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
302
296
                          (25, 48, rabin_hash(_text1[33:49])),
303
297
                          (34, 32, rabin_hash(_text1[17:33])),
304
298
                          (47, 64, rabin_hash(_text1[49:65])),
305
 
                          ], just_entries)
 
299
                         ], just_entries)
306
300
        # This ensures that the hash map points to the location we expect it to
307
301
        for entry_idx, text_offset, hash_val in just_entries:
308
302
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
318
312
        self.assertEqual(68, len(entry_list))
319
313
        just_entries = [(idx, text_offset, hash_val)
320
314
                        for idx, (text_offset, hash_val)
321
 
                        in enumerate(entry_list)
322
 
                        if text_offset != 0 or hash_val != 0]
 
315
                         in enumerate(entry_list)
 
316
                         if text_offset != 0 or hash_val != 0]
323
317
        rabin_hash = self._gc_module._rabin_hash
324
318
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
325
 
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
 
319
                          (9, start2+16, rabin_hash(_text2[1:17])),
326
320
                          (25, 48, rabin_hash(_text1[33:49])),
327
 
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
 
321
                          (30, start2+64, rabin_hash(_text2[49:65])),
328
322
                          (34, 32, rabin_hash(_text1[17:33])),
329
 
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
330
 
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
 
323
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
324
                          (43, start2+48, rabin_hash(_text2[33:49])),
331
325
                          (47, 64, rabin_hash(_text1[49:65])),
332
 
                          ], just_entries)
 
326
                         ], just_entries)
333
327
        # Each entry should be in the appropriate hash bucket.
334
328
        for entry_idx, text_offset, hash_val in just_entries:
335
329
            hash_idx = hash_val & 0xf
336
330
            self.assertTrue(
337
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
 
331
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
338
332
 
339
333
    def test_first_add_source_doesnt_index_until_make_delta(self):
340
334
        di = self._gc_module.DeltaIndex()
345
339
        # generated, and will generate a proper delta
346
340
        delta = di.make_delta(_text2)
347
341
        self.assertTrue(di._has_index())
348
 
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
342
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
349
343
 
350
344
    def test_add_source_max_bytes_to_index(self):
351
345
        di = self._gc_module.DeltaIndex()
352
 
        di._max_bytes_to_index = 3 * 16
353
 
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
354
 
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
 
346
        di._max_bytes_to_index = 3*16
 
347
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
348
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
355
349
        start2 = len(_text1) + 3
356
350
        hash_list, entry_list = di._dump_index()
357
351
        self.assertEqual(16, len(hash_list))
358
352
        self.assertEqual(67, len(entry_list))
359
353
        just_entries = sorted([(text_offset, hash_val)
360
354
                               for text_offset, hash_val in entry_list
361
 
                               if text_offset != 0 or hash_val != 0])
 
355
                                if text_offset != 0 or hash_val != 0])
362
356
        rabin_hash = self._gc_module._rabin_hash
363
357
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
364
358
                          (50, rabin_hash(_text1[35:51])),
365
359
                          (75, rabin_hash(_text1[60:76])),
366
 
                          (start2 + 44, rabin_hash(_text3[29:45])),
367
 
                          (start2 + 88, rabin_hash(_text3[73:89])),
368
 
                          (start2 + 132, rabin_hash(_text3[117:133])),
369
 
                          ], just_entries)
 
360
                          (start2+44, rabin_hash(_text3[29:45])),
 
361
                          (start2+88, rabin_hash(_text3[73:89])),
 
362
                          (start2+132, rabin_hash(_text3[117:133])),
 
363
                         ], just_entries)
370
364
 
371
365
    def test_second_add_source_triggers_make_index(self):
372
366
        di = self._gc_module.DeltaIndex()
379
373
    def test_make_delta(self):
380
374
        di = self._gc_module.DeltaIndex(_text1)
381
375
        delta = di.make_delta(_text2)
382
 
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
376
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
383
377
 
384
378
    def test_delta_against_multiple_sources(self):
385
379
        di = self._gc_module.DeltaIndex()
391
385
        delta = di.make_delta(_third_text)
392
386
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
393
387
        self.assertEqualDiff(_third_text, result)
394
 
        self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
395
 
                         b'\x91v6\x03and\x91d"\x91:\n', delta)
 
388
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
389
                         '\x91v6\x03and\x91d"\x91:\n', delta)
396
390
 
397
391
    def test_delta_with_offsets(self):
398
392
        di = self._gc_module.DeltaIndex()
404
398
        delta = di.make_delta(_third_text)
405
399
        self.assertIsNot(None, delta)
406
400
        result = self._gc_module.apply_delta(
407
 
            b'12345' + _first_text + b'1234567890' + _second_text, delta)
 
401
            '12345' + _first_text + '1234567890' + _second_text, delta)
408
402
        self.assertIsNot(None, result)
409
403
        self.assertEqualDiff(_third_text, result)
410
 
        self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
411
 
                         b'\x91\x856\x03and\x91s"\x91?\n', delta)
 
404
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
405
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
412
406
 
413
407
    def test_delta_with_delta_bytes(self):
414
408
        di = self._gc_module.DeltaIndex()
416
410
        di.add_source(_first_text, 0)
417
411
        self.assertEqual(len(_first_text), di._source_offset)
418
412
        delta = di.make_delta(_second_text)
419
 
        self.assertEqual(b'h\tsome more\x91\x019'
420
 
                         b'&previous text\nand has some extra text\n', delta)
 
413
        self.assertEqual('h\tsome more\x91\x019'
 
414
                         '&previous text\nand has some extra text\n', delta)
421
415
        di.add_delta_source(delta, 0)
422
416
        source += delta
423
417
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
429
423
        # Note that we don't match the 'common with the', because it isn't long
430
424
        # enough to match in the original text, and those bytes are not present
431
425
        # in the delta for the second text.
432
 
        self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
433
 
                         b'\x91S&\x03and\x91\x18,', second_delta)
 
426
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
427
                         '\x91S&\x03and\x91\x18,', second_delta)
434
428
        # Add this delta, and create a new delta for the same text. We should
435
429
        # find the remaining text, and only insert the short 'and' text.
436
430
        di.add_delta_source(second_delta, 0)
438
432
        third_delta = di.make_delta(_third_text)
439
433
        result = self._gc_module.apply_delta(source, third_delta)
440
434
        self.assertEqualDiff(_third_text, result)
441
 
        self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
442
 
                         b'\x91S&\x03and\x91\x18,', third_delta)
 
435
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
436
                         '\x91S&\x03and\x91\x18,', third_delta)
443
437
        # Now create a delta, which we know won't be able to be 'fit' into the
444
438
        # existing index
445
439
        fourth_delta = di.make_delta(_fourth_text)
446
440
        self.assertEqual(_fourth_text,
447
441
                         self._gc_module.apply_delta(source, fourth_delta))
448
 
        self.assertEqual(b'\x80\x01'
449
 
                         b'\x7f123456789012345\nsame rabin hash\n'
450
 
                         b'123456789012345\nsame rabin hash\n'
451
 
                         b'123456789012345\nsame rabin hash\n'
452
 
                         b'123456789012345\nsame rabin hash'
453
 
                         b'\x01\n', fourth_delta)
 
442
        self.assertEqual('\x80\x01'
 
443
                         '\x7f123456789012345\nsame rabin hash\n'
 
444
                         '123456789012345\nsame rabin hash\n'
 
445
                         '123456789012345\nsame rabin hash\n'
 
446
                         '123456789012345\nsame rabin hash'
 
447
                         '\x01\n', fourth_delta)
454
448
        di.add_delta_source(fourth_delta, 0)
455
449
        source += fourth_delta
456
450
        # With the next delta, everything should be found
457
451
        fifth_delta = di.make_delta(_fourth_text)
458
452
        self.assertEqual(_fourth_text,
459
453
                         self._gc_module.apply_delta(source, fifth_delta))
460
 
        self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
454
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
461
455
 
462
456
 
463
457
class TestCopyInstruction(tests.TestCase):
467
461
        self.assertEqual(expected, data)
468
462
 
469
463
    def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
470
 
        cmd = data[pos]
 
464
        cmd = indexbytes(data, pos)
471
465
        pos += 1
472
466
        out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
473
467
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
474
468
 
475
469
    def test_encode_no_length(self):
476
 
        self.assertEncode(b'\x80', 0, 64 * 1024)
477
 
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
478
 
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
479
 
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
480
 
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
481
 
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
482
 
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
483
 
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
484
 
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
485
 
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
486
 
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
487
 
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
 
470
        self.assertEncode(b'\x80', 0, 64*1024)
 
471
        self.assertEncode(b'\x81\x01', 1, 64*1024)
 
472
        self.assertEncode(b'\x81\x0a', 10, 64*1024)
 
473
        self.assertEncode(b'\x81\xff', 255, 64*1024)
 
474
        self.assertEncode(b'\x82\x01', 256, 64*1024)
 
475
        self.assertEncode(b'\x83\x01\x01', 257, 64*1024)
 
476
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
477
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
478
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
479
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
480
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
481
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
488
482
 
489
483
    def test_encode_no_offset(self):
490
484
        self.assertEncode(b'\x90\x01', 0, 1)
496
490
        # Special case, if copy == 64KiB, then we store exactly 0
497
491
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
498
492
        # about that, as we would never actually copy 0 bytes
499
 
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
493
        self.assertEncode(b'\x80', 0, 64*1024)
500
494
 
501
495
    def test_encode(self):
502
496
        self.assertEncode(b'\x91\x01\x01', 1, 1)
508
502
        # Special case, if copy == 64KiB, then we store exactly 0
509
503
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
510
504
        # about that, as we would never actually copy 0 bytes
511
 
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
505
        self.assertEncode(b'\x81\x0a', 10, 64*1024)
512
506
 
513
507
    def test_decode_no_length(self):
514
508
        # If length is 0, it is interpreted as 64KiB
556
550
 
557
551
    scenarios = module_scenarios()
558
552
 
559
 
    _gc_module = None  # Set by load_tests
 
553
    _gc_module = None # Set by load_tests
560
554
 
561
555
    def assertEqualEncode(self, bytes, val):
562
556
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
588
582
        self.assertEqualDecode(127, 1, b'\x7f\x01')
589
583
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
590
584
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')
 
585
 
 
586