/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2018-11-11 04:08:32 UTC
  • mto: (7143.16.20 even-more-cleanups)
  • mto: This revision was merged to the branch mainline in revision 7175.
  • Revision ID: jelmer@jelmer.uk-20181111040832-nsljjynzzwmznf3h
Run autopep8.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
    if compiled_groupcompress_feature.available():
41
41
        gc_module = compiled_groupcompress_feature.module
42
42
        scenarios.append(('C',
43
 
            {'_gc_module': gc_module}))
 
43
                          {'_gc_module': gc_module}))
44
44
    return scenarios
45
45
 
46
46
 
129
129
class TestMakeAndApplyDelta(tests.TestCase):
130
130
 
131
131
    scenarios = module_scenarios()
132
 
    _gc_module = None # Set by load_tests
 
132
    _gc_module = None  # Set by load_tests
133
133
 
134
134
    def setUp(self):
135
135
        super(TestMakeAndApplyDelta, self).setUp()
195
195
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
196
196
            b'\x80'              # Copy 64kB, starting at byte 0
197
197
            b'\x84\x01'          # and another 64kB starting at 64kB
198
 
            b'\xb4\x02\x5c\x83', # And the bit of tail.
 
198
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
199
199
            None,   # Both implementations should be identical
200
200
            delta)
201
201
 
209
209
 
210
210
    def test_apply_delta(self):
211
211
        target = self.apply_delta(_text1,
212
 
                    b'N\x90/\x1fdiffer from\nagainst other text\n')
 
212
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
213
213
        self.assertEqual(_text2, target)
214
214
        target = self.apply_delta(_text2,
215
 
                    b'M\x90/\x1ebe matched\nagainst other text\n')
 
215
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
216
216
        self.assertEqual(_text1, target)
217
217
 
218
218
    def test_apply_delta_to_source_is_safe(self):
219
219
        self.assertRaises(TypeError,
220
 
            self.apply_delta_to_source, object(), 0, 1)
 
220
                          self.apply_delta_to_source, object(), 0, 1)
221
221
        self.assertRaises(TypeError,
222
 
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
222
                          self.apply_delta_to_source, u'unicode str', 0, 1)
223
223
        # end > length
224
224
        self.assertRaises(ValueError,
225
 
            self.apply_delta_to_source, b'foo', 1, 4)
 
225
                          self.apply_delta_to_source, b'foo', 1, 4)
226
226
        # start > length
227
227
        self.assertRaises(ValueError,
228
 
            self.apply_delta_to_source, b'foo', 5, 3)
 
228
                          self.apply_delta_to_source, b'foo', 5, 3)
229
229
        # start > end
230
230
        self.assertRaises(ValueError,
231
 
            self.apply_delta_to_source, b'foo', 3, 2)
 
231
                          self.apply_delta_to_source, b'foo', 3, 2)
232
232
 
233
233
    def test_apply_delta_to_source(self):
234
234
        source_and_delta = (_text1
235
235
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
236
236
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
237
 
                                    len(_text1), len(source_and_delta)))
 
237
                                                            len(_text1), len(source_and_delta)))
238
238
 
239
239
 
240
240
class TestMakeAndApplyCompatible(tests.TestCase):
241
241
 
242
242
    scenarios = two_way_scenarios()
243
243
 
244
 
    make_delta = None # Set by load_tests
245
 
    apply_delta = None # Set by load_tests
 
244
    make_delta = None  # Set by load_tests
 
245
    apply_delta = None  # Set by load_tests
246
246
 
247
247
    def assertMakeAndApply(self, source, target):
248
248
        """Assert that generating a delta and applying gives success."""
289
289
        self.assertEqual(68, len(entry_list))
290
290
        just_entries = [(idx, text_offset, hash_val)
291
291
                        for idx, (text_offset, hash_val)
292
 
                         in enumerate(entry_list)
293
 
                         if text_offset != 0 or hash_val != 0]
 
292
                        in enumerate(entry_list)
 
293
                        if text_offset != 0 or hash_val != 0]
294
294
        rabin_hash = self._gc_module._rabin_hash
295
295
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
296
296
                          (25, 48, rabin_hash(_text1[33:49])),
297
297
                          (34, 32, rabin_hash(_text1[17:33])),
298
298
                          (47, 64, rabin_hash(_text1[49:65])),
299
 
                         ], just_entries)
 
299
                          ], just_entries)
300
300
        # This ensures that the hash map points to the location we expect it to
301
301
        for entry_idx, text_offset, hash_val in just_entries:
302
302
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
312
312
        self.assertEqual(68, len(entry_list))
313
313
        just_entries = [(idx, text_offset, hash_val)
314
314
                        for idx, (text_offset, hash_val)
315
 
                         in enumerate(entry_list)
316
 
                         if text_offset != 0 or hash_val != 0]
 
315
                        in enumerate(entry_list)
 
316
                        if text_offset != 0 or hash_val != 0]
317
317
        rabin_hash = self._gc_module._rabin_hash
318
318
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
319
 
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
319
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
320
320
                          (25, 48, rabin_hash(_text1[33:49])),
321
 
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
321
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
322
322
                          (34, 32, rabin_hash(_text1[17:33])),
323
 
                          (35, start2+32, rabin_hash(_text2[17:33])),
324
 
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
323
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
 
324
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
325
325
                          (47, 64, rabin_hash(_text1[49:65])),
326
 
                         ], just_entries)
 
326
                          ], just_entries)
327
327
        # Each entry should be in the appropriate hash bucket.
328
328
        for entry_idx, text_offset, hash_val in just_entries:
329
329
            hash_idx = hash_val & 0xf
330
330
            self.assertTrue(
331
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
331
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
332
332
 
333
333
    def test_first_add_source_doesnt_index_until_make_delta(self):
334
334
        di = self._gc_module.DeltaIndex()
343
343
 
344
344
    def test_add_source_max_bytes_to_index(self):
345
345
        di = self._gc_module.DeltaIndex()
346
 
        di._max_bytes_to_index = 3*16
347
 
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
348
 
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
346
        di._max_bytes_to_index = 3 * 16
 
347
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
 
348
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
349
349
        start2 = len(_text1) + 3
350
350
        hash_list, entry_list = di._dump_index()
351
351
        self.assertEqual(16, len(hash_list))
352
352
        self.assertEqual(67, len(entry_list))
353
353
        just_entries = sorted([(text_offset, hash_val)
354
354
                               for text_offset, hash_val in entry_list
355
 
                                if text_offset != 0 or hash_val != 0])
 
355
                               if text_offset != 0 or hash_val != 0])
356
356
        rabin_hash = self._gc_module._rabin_hash
357
357
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
358
358
                          (50, rabin_hash(_text1[35:51])),
359
359
                          (75, rabin_hash(_text1[60:76])),
360
 
                          (start2+44, rabin_hash(_text3[29:45])),
361
 
                          (start2+88, rabin_hash(_text3[73:89])),
362
 
                          (start2+132, rabin_hash(_text3[117:133])),
363
 
                         ], just_entries)
 
360
                          (start2 + 44, rabin_hash(_text3[29:45])),
 
361
                          (start2 + 88, rabin_hash(_text3[73:89])),
 
362
                          (start2 + 132, rabin_hash(_text3[117:133])),
 
363
                          ], just_entries)
364
364
 
365
365
    def test_second_add_source_triggers_make_index(self):
366
366
        di = self._gc_module.DeltaIndex()
467
467
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
468
468
 
469
469
    def test_encode_no_length(self):
470
 
        self.assertEncode(b'\x80', 0, 64*1024)
471
 
        self.assertEncode(b'\x81\x01', 1, 64*1024)
472
 
        self.assertEncode(b'\x81\x0a', 10, 64*1024)
473
 
        self.assertEncode(b'\x81\xff', 255, 64*1024)
474
 
        self.assertEncode(b'\x82\x01', 256, 64*1024)
475
 
        self.assertEncode(b'\x83\x01\x01', 257, 64*1024)
476
 
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
477
 
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
478
 
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
479
 
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
480
 
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
481
 
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
470
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
471
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
 
472
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
473
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
 
474
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
 
475
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
 
476
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
 
477
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
 
478
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
 
479
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
 
480
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
 
481
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
482
482
 
483
483
    def test_encode_no_offset(self):
484
484
        self.assertEncode(b'\x90\x01', 0, 1)
490
490
        # Special case, if copy == 64KiB, then we store exactly 0
491
491
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
492
492
        # about that, as we would never actually copy 0 bytes
493
 
        self.assertEncode(b'\x80', 0, 64*1024)
 
493
        self.assertEncode(b'\x80', 0, 64 * 1024)
494
494
 
495
495
    def test_encode(self):
496
496
        self.assertEncode(b'\x91\x01\x01', 1, 1)
502
502
        # Special case, if copy == 64KiB, then we store exactly 0
503
503
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
504
504
        # about that, as we would never actually copy 0 bytes
505
 
        self.assertEncode(b'\x81\x0a', 10, 64*1024)
 
505
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
506
506
 
507
507
    def test_decode_no_length(self):
508
508
        # If length is 0, it is interpreted as 64KiB
550
550
 
551
551
    scenarios = module_scenarios()
552
552
 
553
 
    _gc_module = None # Set by load_tests
 
553
    _gc_module = None  # Set by load_tests
554
554
 
555
555
    def assertEqualEncode(self, bytes, val):
556
556
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
582
582
        self.assertEqualDecode(127, 1, b'\x7f\x01')
583
583
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
584
584
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')
585
 
 
586