/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Gordon Tyler
  • Date: 2011-06-30 21:00:38 UTC
  • mto: This revision was merged to the branch mainline in revision 6007.
  • Revision ID: gordon@doxxx.net-20110630210038-bzscps46jgcqtkr0
Use known executables for win32 and other platforms in test_exe_on_path.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
19
from bzrlib import (
20
 
    groupcompress,
21
20
    _groupcompress_py,
22
21
    tests,
23
22
    )
24
 
 
25
 
 
26
 
def load_tests(standard_tests, module, loader):
27
 
    """Parameterize tests for all versions of groupcompress."""
28
 
    two_way_scenarios = [
29
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
30
 
                'apply_delta': _groupcompress_py.apply_delta})
31
 
        ]
 
23
from bzrlib.tests.scenarios import (
 
24
    load_tests_apply_scenarios,
 
25
    )
 
26
 
 
27
 
 
28
def module_scenarios():
32
29
    scenarios = [
33
30
        ('python', {'_gc_module': _groupcompress_py}),
34
31
        ]
36
33
        gc_module = compiled_groupcompress_feature.module
37
34
        scenarios.append(('C',
38
35
            {'_gc_module': gc_module}))
39
 
        two_way_scenarios.extend([
 
36
    return scenarios
 
37
 
 
38
 
 
39
def two_way_scenarios():
 
40
    scenarios = [
 
41
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
42
                'apply_delta': _groupcompress_py.apply_delta})
 
43
        ]
 
44
    if compiled_groupcompress_feature.available():
 
45
        gc_module = compiled_groupcompress_feature.module
 
46
        scenarios.extend([
40
47
            ('CC', {'make_delta': gc_module.make_delta,
41
48
                    'apply_delta': gc_module.apply_delta}),
42
49
            ('PC', {'make_delta': _groupcompress_py.make_delta,
44
51
            ('CP', {'make_delta': gc_module.make_delta,
45
52
                    'apply_delta': _groupcompress_py.apply_delta}),
46
53
            ])
47
 
    to_adapt, result = tests.split_suite_by_condition(
48
 
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
49
 
                                                    TestBase128Int)))
50
 
    result = tests.multiply_tests(to_adapt, scenarios, result)
51
 
    to_adapt, result = tests.split_suite_by_condition(result,
52
 
        tests.condition_isinstance(TestMakeAndApplyCompatible))
53
 
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
54
 
    return result
 
54
    return scenarios
 
55
 
 
56
 
 
57
load_tests = load_tests_apply_scenarios
55
58
 
56
59
 
57
60
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
116
119
 
117
120
class TestMakeAndApplyDelta(tests.TestCase):
118
121
 
 
122
    scenarios = module_scenarios()
119
123
    _gc_module = None # Set by load_tests
120
124
 
121
125
    def setUp(self):
226
230
 
227
231
class TestMakeAndApplyCompatible(tests.TestCase):
228
232
 
 
233
    scenarios = two_way_scenarios()
 
234
 
229
235
    make_delta = None # Set by load_tests
230
236
    apply_delta = None # Set by load_tests
231
237
 
258
264
        di = self._gc_module.DeltaIndex('test text\n')
259
265
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
260
266
 
 
267
    def test__dump_no_index(self):
 
268
        di = self._gc_module.DeltaIndex()
 
269
        self.assertEqual(None, di._dump_index())
 
270
 
 
271
    def test__dump_index_simple(self):
 
272
        di = self._gc_module.DeltaIndex()
 
273
        di.add_source(_text1, 0)
 
274
        self.assertFalse(di._has_index())
 
275
        self.assertEqual(None, di._dump_index())
 
276
        _ = di.make_delta(_text1)
 
277
        self.assertTrue(di._has_index())
 
278
        hash_list, entry_list = di._dump_index()
 
279
        self.assertEqual(16, len(hash_list))
 
280
        self.assertEqual(68, len(entry_list))
 
281
        just_entries = [(idx, text_offset, hash_val)
 
282
                        for idx, (text_offset, hash_val)
 
283
                         in enumerate(entry_list)
 
284
                         if text_offset != 0 or hash_val != 0]
 
285
        rabin_hash = self._gc_module._rabin_hash
 
286
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
287
                          (25, 48, rabin_hash(_text1[33:49])),
 
288
                          (34, 32, rabin_hash(_text1[17:33])),
 
289
                          (47, 64, rabin_hash(_text1[49:65])),
 
290
                         ], just_entries)
 
291
        # This ensures that the hash map points to the location we expect it to
 
292
        for entry_idx, text_offset, hash_val in just_entries:
 
293
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
294
 
 
295
    def test__dump_index_two_sources(self):
 
296
        di = self._gc_module.DeltaIndex()
 
297
        di.add_source(_text1, 0)
 
298
        di.add_source(_text2, 2)
 
299
        start2 = len(_text1) + 2
 
300
        self.assertTrue(di._has_index())
 
301
        hash_list, entry_list = di._dump_index()
 
302
        self.assertEqual(16, len(hash_list))
 
303
        self.assertEqual(68, len(entry_list))
 
304
        just_entries = [(idx, text_offset, hash_val)
 
305
                        for idx, (text_offset, hash_val)
 
306
                         in enumerate(entry_list)
 
307
                         if text_offset != 0 or hash_val != 0]
 
308
        rabin_hash = self._gc_module._rabin_hash
 
309
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
310
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
311
                          (25, 48, rabin_hash(_text1[33:49])),
 
312
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
313
                          (34, 32, rabin_hash(_text1[17:33])),
 
314
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
315
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
316
                          (47, 64, rabin_hash(_text1[49:65])),
 
317
                         ], just_entries)
 
318
        # Each entry should be in the appropriate hash bucket.
 
319
        for entry_idx, text_offset, hash_val in just_entries:
 
320
            hash_idx = hash_val & 0xf
 
321
            self.assertTrue(
 
322
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
323
 
261
324
    def test_first_add_source_doesnt_index_until_make_delta(self):
262
325
        di = self._gc_module.DeltaIndex()
263
326
        self.assertFalse(di._has_index())
269
332
        self.assertTrue(di._has_index())
270
333
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
271
334
 
 
335
    def test_add_source_max_bytes_to_index(self):
 
336
        di = self._gc_module.DeltaIndex()
 
337
        di._max_bytes_to_index = 3*16
 
338
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
339
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
340
        start2 = len(_text1) + 3
 
341
        hash_list, entry_list = di._dump_index()
 
342
        self.assertEqual(16, len(hash_list))
 
343
        self.assertEqual(67, len(entry_list))
 
344
        just_entries = sorted([(text_offset, hash_val)
 
345
                               for text_offset, hash_val in entry_list
 
346
                                if text_offset != 0 or hash_val != 0])
 
347
        rabin_hash = self._gc_module._rabin_hash
 
348
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
349
                          (50, rabin_hash(_text1[35:51])),
 
350
                          (75, rabin_hash(_text1[60:76])),
 
351
                          (start2+44, rabin_hash(_text3[29:45])),
 
352
                          (start2+88, rabin_hash(_text3[73:89])),
 
353
                          (start2+132, rabin_hash(_text3[117:133])),
 
354
                         ], just_entries)
 
355
 
272
356
    def test_second_add_source_triggers_make_index(self):
273
357
        di = self._gc_module.DeltaIndex()
274
358
        self.assertFalse(di._has_index())
457
541
 
458
542
class TestBase128Int(tests.TestCase):
459
543
 
 
544
    scenarios = module_scenarios()
 
545
 
460
546
    _gc_module = None # Set by load_tests
461
547
 
462
548
    def assertEqualEncode(self, bytes, val):