/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
 
from bzrlib import (
20
 
    groupcompress,
 
19
from brzlib import (
21
20
    _groupcompress_py,
22
21
    tests,
23
22
    )
24
 
 
25
 
 
26
 
def load_tests(standard_tests, module, loader):
27
 
    """Parameterize tests for all versions of groupcompress."""
28
 
    two_way_scenarios = [
29
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
30
 
                'apply_delta': _groupcompress_py.apply_delta})
31
 
        ]
 
23
from brzlib.tests.scenarios import (
 
24
    load_tests_apply_scenarios,
 
25
    )
 
26
from brzlib.tests import (
 
27
    features,
 
28
    )
 
29
 
 
30
 
 
31
def module_scenarios():
32
32
    scenarios = [
33
33
        ('python', {'_gc_module': _groupcompress_py}),
34
34
        ]
36
36
        gc_module = compiled_groupcompress_feature.module
37
37
        scenarios.append(('C',
38
38
            {'_gc_module': gc_module}))
39
 
        two_way_scenarios.extend([
 
39
    return scenarios
 
40
 
 
41
 
 
42
def two_way_scenarios():
 
43
    scenarios = [
 
44
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
45
                'apply_delta': _groupcompress_py.apply_delta})
 
46
        ]
 
47
    if compiled_groupcompress_feature.available():
 
48
        gc_module = compiled_groupcompress_feature.module
 
49
        scenarios.extend([
40
50
            ('CC', {'make_delta': gc_module.make_delta,
41
51
                    'apply_delta': gc_module.apply_delta}),
42
52
            ('PC', {'make_delta': _groupcompress_py.make_delta,
44
54
            ('CP', {'make_delta': gc_module.make_delta,
45
55
                    'apply_delta': _groupcompress_py.apply_delta}),
46
56
            ])
47
 
    to_adapt, result = tests.split_suite_by_condition(
48
 
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
49
 
                                                    TestBase128Int)))
50
 
    result = tests.multiply_tests(to_adapt, scenarios, result)
51
 
    to_adapt, result = tests.split_suite_by_condition(result,
52
 
        tests.condition_isinstance(TestMakeAndApplyCompatible))
53
 
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
54
 
    return result
55
 
 
56
 
 
57
 
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
58
 
                                    'bzrlib._groupcompress_pyx')
 
57
    return scenarios
 
58
 
 
59
 
 
60
load_tests = load_tests_apply_scenarios
 
61
 
 
62
 
 
63
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
64
    'brzlib._groupcompress_pyx')
59
65
 
60
66
_text1 = """\
61
67
This is a bit
116
122
 
117
123
class TestMakeAndApplyDelta(tests.TestCase):
118
124
 
 
125
    scenarios = module_scenarios()
119
126
    _gc_module = None # Set by load_tests
120
127
 
121
128
    def setUp(self):
226
233
 
227
234
class TestMakeAndApplyCompatible(tests.TestCase):
228
235
 
 
236
    scenarios = two_way_scenarios()
 
237
 
229
238
    make_delta = None # Set by load_tests
230
239
    apply_delta = None # Set by load_tests
231
240
 
258
267
        di = self._gc_module.DeltaIndex('test text\n')
259
268
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
260
269
 
 
270
    def test__dump_no_index(self):
 
271
        di = self._gc_module.DeltaIndex()
 
272
        self.assertEqual(None, di._dump_index())
 
273
 
 
274
    def test__dump_index_simple(self):
 
275
        di = self._gc_module.DeltaIndex()
 
276
        di.add_source(_text1, 0)
 
277
        self.assertFalse(di._has_index())
 
278
        self.assertEqual(None, di._dump_index())
 
279
        _ = di.make_delta(_text1)
 
280
        self.assertTrue(di._has_index())
 
281
        hash_list, entry_list = di._dump_index()
 
282
        self.assertEqual(16, len(hash_list))
 
283
        self.assertEqual(68, len(entry_list))
 
284
        just_entries = [(idx, text_offset, hash_val)
 
285
                        for idx, (text_offset, hash_val)
 
286
                         in enumerate(entry_list)
 
287
                         if text_offset != 0 or hash_val != 0]
 
288
        rabin_hash = self._gc_module._rabin_hash
 
289
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
290
                          (25, 48, rabin_hash(_text1[33:49])),
 
291
                          (34, 32, rabin_hash(_text1[17:33])),
 
292
                          (47, 64, rabin_hash(_text1[49:65])),
 
293
                         ], just_entries)
 
294
        # This ensures that the hash map points to the location we expect it to
 
295
        for entry_idx, text_offset, hash_val in just_entries:
 
296
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
297
 
 
298
    def test__dump_index_two_sources(self):
 
299
        di = self._gc_module.DeltaIndex()
 
300
        di.add_source(_text1, 0)
 
301
        di.add_source(_text2, 2)
 
302
        start2 = len(_text1) + 2
 
303
        self.assertTrue(di._has_index())
 
304
        hash_list, entry_list = di._dump_index()
 
305
        self.assertEqual(16, len(hash_list))
 
306
        self.assertEqual(68, len(entry_list))
 
307
        just_entries = [(idx, text_offset, hash_val)
 
308
                        for idx, (text_offset, hash_val)
 
309
                         in enumerate(entry_list)
 
310
                         if text_offset != 0 or hash_val != 0]
 
311
        rabin_hash = self._gc_module._rabin_hash
 
312
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
313
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
314
                          (25, 48, rabin_hash(_text1[33:49])),
 
315
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
316
                          (34, 32, rabin_hash(_text1[17:33])),
 
317
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
318
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
319
                          (47, 64, rabin_hash(_text1[49:65])),
 
320
                         ], just_entries)
 
321
        # Each entry should be in the appropriate hash bucket.
 
322
        for entry_idx, text_offset, hash_val in just_entries:
 
323
            hash_idx = hash_val & 0xf
 
324
            self.assertTrue(
 
325
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
326
 
261
327
    def test_first_add_source_doesnt_index_until_make_delta(self):
262
328
        di = self._gc_module.DeltaIndex()
263
329
        self.assertFalse(di._has_index())
269
335
        self.assertTrue(di._has_index())
270
336
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
271
337
 
 
338
    def test_add_source_max_bytes_to_index(self):
 
339
        di = self._gc_module.DeltaIndex()
 
340
        di._max_bytes_to_index = 3*16
 
341
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
342
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
343
        start2 = len(_text1) + 3
 
344
        hash_list, entry_list = di._dump_index()
 
345
        self.assertEqual(16, len(hash_list))
 
346
        self.assertEqual(67, len(entry_list))
 
347
        just_entries = sorted([(text_offset, hash_val)
 
348
                               for text_offset, hash_val in entry_list
 
349
                                if text_offset != 0 or hash_val != 0])
 
350
        rabin_hash = self._gc_module._rabin_hash
 
351
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
352
                          (50, rabin_hash(_text1[35:51])),
 
353
                          (75, rabin_hash(_text1[60:76])),
 
354
                          (start2+44, rabin_hash(_text3[29:45])),
 
355
                          (start2+88, rabin_hash(_text3[73:89])),
 
356
                          (start2+132, rabin_hash(_text3[117:133])),
 
357
                         ], just_entries)
 
358
 
272
359
    def test_second_add_source_triggers_make_index(self):
273
360
        di = self._gc_module.DeltaIndex()
274
361
        self.assertFalse(di._has_index())
457
544
 
458
545
class TestBase128Int(tests.TestCase):
459
546
 
 
547
    scenarios = module_scenarios()
 
548
 
460
549
    _gc_module = None # Set by load_tests
461
550
 
462
551
    def assertEqualEncode(self, bytes, val):