/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2018-11-16 18:59:44 UTC
  • mfrom: (7143.15.15 more-cleanups)
  • Revision ID: breezy.the.bot@gmail.com-20181116185944-biefv1sub37qfybm
Sprinkle some PEP8iness.

Merged from https://code.launchpad.net/~jelmer/brz/more-cleanups/+merge/358611

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
    if compiled_groupcompress_feature.available():
43
43
        gc_module = compiled_groupcompress_feature.module
44
44
        scenarios.append(('C',
45
 
            {'_gc_module': gc_module}))
 
45
                          {'_gc_module': gc_module}))
46
46
    return scenarios
47
47
 
48
48
 
131
131
class TestMakeAndApplyDelta(tests.TestCase):
132
132
 
133
133
    scenarios = module_scenarios()
134
 
    _gc_module = None # Set by load_tests
 
134
    _gc_module = None  # Set by load_tests
135
135
 
136
136
    def setUp(self):
137
137
        super(TestMakeAndApplyDelta, self).setUp()
197
197
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
198
198
            b'\x80'              # Copy 64kB, starting at byte 0
199
199
            b'\x84\x01'          # and another 64kB starting at 64kB
200
 
            b'\xb4\x02\x5c\x83', # And the bit of tail.
 
200
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
201
201
            None,   # Both implementations should be identical
202
202
            delta)
203
203
 
211
211
 
212
212
    def test_apply_delta(self):
213
213
        target = self.apply_delta(_text1,
214
 
                    b'N\x90/\x1fdiffer from\nagainst other text\n')
 
214
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
215
215
        self.assertEqual(_text2, target)
216
216
        target = self.apply_delta(_text2,
217
 
                    b'M\x90/\x1ebe matched\nagainst other text\n')
 
217
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
218
218
        self.assertEqual(_text1, target)
219
219
 
220
220
    def test_apply_delta_to_source_is_safe(self):
221
221
        self.assertRaises(TypeError,
222
 
            self.apply_delta_to_source, object(), 0, 1)
 
222
                          self.apply_delta_to_source, object(), 0, 1)
223
223
        self.assertRaises(TypeError,
224
 
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
224
                          self.apply_delta_to_source, u'unicode str', 0, 1)
225
225
        # end > length
226
226
        self.assertRaises(ValueError,
227
 
            self.apply_delta_to_source, b'foo', 1, 4)
 
227
                          self.apply_delta_to_source, b'foo', 1, 4)
228
228
        # start > length
229
229
        self.assertRaises(ValueError,
230
 
            self.apply_delta_to_source, b'foo', 5, 3)
 
230
                          self.apply_delta_to_source, b'foo', 5, 3)
231
231
        # start > end
232
232
        self.assertRaises(ValueError,
233
 
            self.apply_delta_to_source, b'foo', 3, 2)
 
233
                          self.apply_delta_to_source, b'foo', 3, 2)
234
234
 
235
235
    def test_apply_delta_to_source(self):
236
236
        source_and_delta = (_text1
237
237
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
238
238
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
239
 
                                    len(_text1), len(source_and_delta)))
 
239
                                                            len(_text1), len(source_and_delta)))
240
240
 
241
241
 
242
242
class TestMakeAndApplyCompatible(tests.TestCase):
243
243
 
244
244
    scenarios = two_way_scenarios()
245
245
 
246
 
    make_delta = None # Set by load_tests
247
 
    apply_delta = None # Set by load_tests
 
246
    make_delta = None  # Set by load_tests
 
247
    apply_delta = None  # Set by load_tests
248
248
 
249
249
    def assertMakeAndApply(self, source, target):
250
250
        """Assert that generating a delta and applying gives success."""
298
298
        self.assertEqual(68, len(entry_list))
299
299
        just_entries = [(idx, text_offset, hash_val)
300
300
                        for idx, (text_offset, hash_val)
301
 
                         in enumerate(entry_list)
302
 
                         if text_offset != 0 or hash_val != 0]
 
301
                        in enumerate(entry_list)
 
302
                        if text_offset != 0 or hash_val != 0]
303
303
        rabin_hash = self._gc_module._rabin_hash
304
304
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
305
305
                          (25, 48, rabin_hash(_text1[33:49])),
306
306
                          (34, 32, rabin_hash(_text1[17:33])),
307
307
                          (47, 64, rabin_hash(_text1[49:65])),
308
 
                         ], just_entries)
 
308
                          ], just_entries)
309
309
        # This ensures that the hash map points to the location we expect it to
310
310
        for entry_idx, text_offset, hash_val in just_entries:
311
311
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
321
321
        self.assertEqual(68, len(entry_list))
322
322
        just_entries = [(idx, text_offset, hash_val)
323
323
                        for idx, (text_offset, hash_val)
324
 
                         in enumerate(entry_list)
325
 
                         if text_offset != 0 or hash_val != 0]
 
324
                        in enumerate(entry_list)
 
325
                        if text_offset != 0 or hash_val != 0]
326
326
        rabin_hash = self._gc_module._rabin_hash
327
327
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
328
 
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
328
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
329
329
                          (25, 48, rabin_hash(_text1[33:49])),
330
 
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
330
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
331
331
                          (34, 32, rabin_hash(_text1[17:33])),
332
 
                          (35, start2+32, rabin_hash(_text2[17:33])),
333
 
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
332
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
 
333
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
334
334
                          (47, 64, rabin_hash(_text1[49:65])),
335
 
                         ], just_entries)
 
335
                          ], just_entries)
336
336
        # Each entry should be in the appropriate hash bucket.
337
337
        for entry_idx, text_offset, hash_val in just_entries:
338
338
            hash_idx = hash_val & 0xf
339
339
            self.assertTrue(
340
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
340
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
341
341
 
342
342
    def test_first_add_source_doesnt_index_until_make_delta(self):
343
343
        di = self._gc_module.DeltaIndex()
352
352
 
353
353
    def test_add_source_max_bytes_to_index(self):
354
354
        di = self._gc_module.DeltaIndex()
355
 
        di._max_bytes_to_index = 3*16
356
 
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
357
 
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
355
        di._max_bytes_to_index = 3 * 16
 
356
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
 
357
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
358
358
        start2 = len(_text1) + 3
359
359
        hash_list, entry_list = di._dump_index()
360
360
        self.assertEqual(16, len(hash_list))
361
361
        self.assertEqual(67, len(entry_list))
362
362
        just_entries = sorted([(text_offset, hash_val)
363
363
                               for text_offset, hash_val in entry_list
364
 
                                if text_offset != 0 or hash_val != 0])
 
364
                               if text_offset != 0 or hash_val != 0])
365
365
        rabin_hash = self._gc_module._rabin_hash
366
366
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
367
367
                          (50, rabin_hash(_text1[35:51])),
368
368
                          (75, rabin_hash(_text1[60:76])),
369
 
                          (start2+44, rabin_hash(_text3[29:45])),
370
 
                          (start2+88, rabin_hash(_text3[73:89])),
371
 
                          (start2+132, rabin_hash(_text3[117:133])),
372
 
                         ], just_entries)
 
369
                          (start2 + 44, rabin_hash(_text3[29:45])),
 
370
                          (start2 + 88, rabin_hash(_text3[73:89])),
 
371
                          (start2 + 132, rabin_hash(_text3[117:133])),
 
372
                          ], just_entries)
373
373
 
374
374
    def test_second_add_source_triggers_make_index(self):
375
375
        di = self._gc_module.DeltaIndex()
476
476
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
477
477
 
478
478
    def test_encode_no_length(self):
479
 
        self.assertEncode(b'\x80', 0, 64*1024)
480
 
        self.assertEncode(b'\x81\x01', 1, 64*1024)
481
 
        self.assertEncode(b'\x81\x0a', 10, 64*1024)
482
 
        self.assertEncode(b'\x81\xff', 255, 64*1024)
483
 
        self.assertEncode(b'\x82\x01', 256, 64*1024)
484
 
        self.assertEncode(b'\x83\x01\x01', 257, 64*1024)
485
 
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
486
 
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
487
 
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
488
 
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
489
 
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
490
 
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
479
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
480
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
 
481
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
482
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
 
483
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
 
484
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
 
485
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
 
486
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
 
487
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
 
488
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
 
489
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
 
490
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
491
491
 
492
492
    def test_encode_no_offset(self):
493
493
        self.assertEncode(b'\x90\x01', 0, 1)
499
499
        # Special case, if copy == 64KiB, then we store exactly 0
500
500
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
501
501
        # about that, as we would never actually copy 0 bytes
502
 
        self.assertEncode(b'\x80', 0, 64*1024)
 
502
        self.assertEncode(b'\x80', 0, 64 * 1024)
503
503
 
504
504
    def test_encode(self):
505
505
        self.assertEncode(b'\x91\x01\x01', 1, 1)
511
511
        # Special case, if copy == 64KiB, then we store exactly 0
512
512
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
513
513
        # about that, as we would never actually copy 0 bytes
514
 
        self.assertEncode(b'\x81\x0a', 10, 64*1024)
 
514
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
515
515
 
516
516
    def test_decode_no_length(self):
517
517
        # If length is 0, it is interpreted as 64KiB
559
559
 
560
560
    scenarios = module_scenarios()
561
561
 
562
 
    _gc_module = None # Set by load_tests
 
562
    _gc_module = None  # Set by load_tests
563
563
 
564
564
    def assertEqualEncode(self, bytes, val):
565
565
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
591
591
        self.assertEqualDecode(127, 1, b'\x7f\x01')
592
592
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
593
593
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')
594
 
 
595