/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-18 01:57:45 UTC
  • mto: This revision was merged to the branch mainline in revision 7493.
  • Revision ID: jelmer@jelmer.uk-20200218015745-q2ss9tsk74h4nh61
drop use of future.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
 
from bzrlib import (
20
 
    groupcompress,
 
19
import sys
 
20
 
 
21
from .. import (
 
22
    tests,
 
23
    )
 
24
from ..bzr import (
21
25
    _groupcompress_py,
22
 
    tests,
23
 
    )
24
 
 
25
 
 
26
 
def load_tests(standard_tests, module, loader):
27
 
    """Parameterize tests for all versions of groupcompress."""
28
 
    two_way_scenarios = [
 
26
    )
 
27
from .scenarios import (
 
28
    load_tests_apply_scenarios,
 
29
    )
 
30
from . import (
 
31
    features,
 
32
    )
 
33
 
 
34
 
 
35
def module_scenarios():
 
36
    scenarios = [
 
37
        ('python', {'_gc_module': _groupcompress_py}),
 
38
        ]
 
39
    if compiled_groupcompress_feature.available():
 
40
        gc_module = compiled_groupcompress_feature.module
 
41
        scenarios.append(('C',
 
42
                          {'_gc_module': gc_module}))
 
43
    return scenarios
 
44
 
 
45
 
 
46
def two_way_scenarios():
 
47
    scenarios = [
29
48
        ('PP', {'make_delta': _groupcompress_py.make_delta,
30
49
                'apply_delta': _groupcompress_py.apply_delta})
31
50
        ]
32
 
    scenarios = [
33
 
        ('python', {'_gc_module': _groupcompress_py}),
34
 
        ]
35
51
    if compiled_groupcompress_feature.available():
36
52
        gc_module = compiled_groupcompress_feature.module
37
 
        scenarios.append(('C',
38
 
            {'_gc_module': gc_module}))
39
 
        two_way_scenarios.extend([
 
53
        scenarios.extend([
40
54
            ('CC', {'make_delta': gc_module.make_delta,
41
55
                    'apply_delta': gc_module.apply_delta}),
42
56
            ('PC', {'make_delta': _groupcompress_py.make_delta,
44
58
            ('CP', {'make_delta': gc_module.make_delta,
45
59
                    'apply_delta': _groupcompress_py.apply_delta}),
46
60
            ])
47
 
    to_adapt, result = tests.split_suite_by_condition(
48
 
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
49
 
                                                    TestBase128Int)))
50
 
    result = tests.multiply_tests(to_adapt, scenarios, result)
51
 
    to_adapt, result = tests.split_suite_by_condition(result,
52
 
        tests.condition_isinstance(TestMakeAndApplyCompatible))
53
 
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
54
 
    return result
55
 
 
56
 
 
57
 
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
58
 
                                    'bzrlib._groupcompress_pyx')
59
 
 
60
 
_text1 = """\
 
61
    return scenarios
 
62
 
 
63
 
 
64
load_tests = load_tests_apply_scenarios
 
65
 
 
66
 
 
67
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
68
    'breezy.bzr._groupcompress_pyx')
 
69
 
 
70
_text1 = b"""\
61
71
This is a bit
62
72
of source text
63
73
which is meant to be matched
64
74
against other text
65
75
"""
66
76
 
67
 
_text2 = """\
 
77
_text2 = b"""\
68
78
This is a bit
69
79
of source text
70
80
which is meant to differ from
71
81
against other text
72
82
"""
73
83
 
74
 
_text3 = """\
 
84
_text3 = b"""\
75
85
This is a bit
76
86
of source text
77
87
which is meant to be matched
81
91
at the end of the file
82
92
"""
83
93
 
84
 
_first_text = """\
 
94
_first_text = b"""\
85
95
a bit of text, that
86
96
does not have much in
87
97
common with the next text
88
98
"""
89
99
 
90
 
_second_text = """\
 
100
_second_text = b"""\
91
101
some more bit of text, that
92
102
does not have much in
93
103
common with the previous text
95
105
"""
96
106
 
97
107
 
98
 
_third_text = """\
 
108
_third_text = b"""\
99
109
a bit of text, that
100
110
has some in common with the previous text
101
111
and has some extra text
103
113
common with the next text
104
114
"""
105
115
 
106
 
_fourth_text = """\
 
116
_fourth_text = b"""\
107
117
123456789012345
108
118
same rabin hash
109
119
123456789012345
114
124
same rabin hash
115
125
"""
116
126
 
 
127
 
117
128
class TestMakeAndApplyDelta(tests.TestCase):
118
129
 
119
 
    _gc_module = None # Set by load_tests
 
130
    scenarios = module_scenarios()
 
131
    _gc_module = None  # Set by load_tests
120
132
 
121
133
    def setUp(self):
122
134
        super(TestMakeAndApplyDelta, self).setUp()
125
137
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
126
138
 
127
139
    def test_make_delta_is_typesafe(self):
128
 
        self.make_delta('a string', 'another string')
 
140
        self.make_delta(b'a string', b'another string')
129
141
 
130
142
        def _check_make_delta(string1, string2):
131
143
            self.assertRaises(TypeError, self.make_delta, string1, string2)
132
144
 
133
 
        _check_make_delta('a string', object())
134
 
        _check_make_delta('a string', u'not a string')
135
 
        _check_make_delta(object(), 'a string')
136
 
        _check_make_delta(u'not a string', 'a string')
 
145
        _check_make_delta(b'a string', object())
 
146
        _check_make_delta(b'a string', u'not a string')
 
147
        _check_make_delta(object(), b'a string')
 
148
        _check_make_delta(u'not a string', b'a string')
137
149
 
138
150
    def test_make_noop_delta(self):
139
151
        ident_delta = self.make_delta(_text1, _text1)
140
 
        self.assertEqual('M\x90M', ident_delta)
 
152
        self.assertEqual(b'M\x90M', ident_delta)
141
153
        ident_delta = self.make_delta(_text2, _text2)
142
 
        self.assertEqual('N\x90N', ident_delta)
 
154
        self.assertEqual(b'N\x90N', ident_delta)
143
155
        ident_delta = self.make_delta(_text3, _text3)
144
 
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
 
156
        self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
145
157
 
146
158
    def assertDeltaIn(self, delta1, delta2, delta):
147
159
        """Make sure that the delta bytes match one of the expectations."""
148
160
        # In general, the python delta matcher gives different results than the
149
161
        # pyrex delta matcher. Both should be valid deltas, though.
150
162
        if delta not in (delta1, delta2):
151
 
            self.fail("Delta bytes:\n"
152
 
                      "       %r\n"
153
 
                      "not in %r\n"
154
 
                      "    or %r"
 
163
            self.fail(b"Delta bytes:\n"
 
164
                      b"       %r\n"
 
165
                      b"not in %r\n"
 
166
                      b"    or %r"
155
167
                      % (delta, delta1, delta2))
156
168
 
157
169
    def test_make_delta(self):
158
170
        delta = self.make_delta(_text1, _text2)
159
171
        self.assertDeltaIn(
160
 
            'N\x90/\x1fdiffer from\nagainst other text\n',
161
 
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
172
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
173
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
162
174
            delta)
163
175
        delta = self.make_delta(_text2, _text1)
164
176
        self.assertDeltaIn(
165
 
            'M\x90/\x1ebe matched\nagainst other text\n',
166
 
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
177
            b'M\x90/\x1ebe matched\nagainst other text\n',
 
178
            b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
167
179
            delta)
168
180
        delta = self.make_delta(_text3, _text1)
169
 
        self.assertEqual('M\x90M', delta)
 
181
        self.assertEqual(b'M\x90M', delta)
170
182
        delta = self.make_delta(_text3, _text2)
171
183
        self.assertDeltaIn(
172
 
            'N\x90/\x1fdiffer from\nagainst other text\n',
173
 
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
184
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
185
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
174
186
            delta)
175
187
 
176
188
    def test_make_delta_with_large_copies(self):
179
191
        big_text = _text3 * 1220
180
192
        delta = self.make_delta(big_text, big_text)
181
193
        self.assertDeltaIn(
182
 
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
183
 
            '\x80'              # Copy 64kB, starting at byte 0
184
 
            '\x84\x01'          # and another 64kB starting at 64kB
185
 
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
194
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
195
            b'\x80'              # Copy 64kB, starting at byte 0
 
196
            b'\x84\x01'          # and another 64kB starting at 64kB
 
197
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
186
198
            None,   # Both implementations should be identical
187
199
            delta)
188
200
 
189
201
    def test_apply_delta_is_typesafe(self):
190
 
        self.apply_delta(_text1, 'M\x90M')
191
 
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
 
202
        self.apply_delta(_text1, b'M\x90M')
 
203
        self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
192
204
        self.assertRaises(TypeError, self.apply_delta,
193
 
                          unicode(_text1), 'M\x90M')
 
205
                          _text1.decode('latin1'), b'M\x90M')
194
206
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
195
207
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
196
208
 
197
209
    def test_apply_delta(self):
198
210
        target = self.apply_delta(_text1,
199
 
                    'N\x90/\x1fdiffer from\nagainst other text\n')
 
211
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
200
212
        self.assertEqual(_text2, target)
201
213
        target = self.apply_delta(_text2,
202
 
                    'M\x90/\x1ebe matched\nagainst other text\n')
 
214
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
203
215
        self.assertEqual(_text1, target)
204
216
 
205
217
    def test_apply_delta_to_source_is_safe(self):
206
218
        self.assertRaises(TypeError,
207
 
            self.apply_delta_to_source, object(), 0, 1)
 
219
                          self.apply_delta_to_source, object(), 0, 1)
208
220
        self.assertRaises(TypeError,
209
 
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
221
                          self.apply_delta_to_source, u'unicode str', 0, 1)
210
222
        # end > length
211
223
        self.assertRaises(ValueError,
212
 
            self.apply_delta_to_source, 'foo', 1, 4)
 
224
                          self.apply_delta_to_source, b'foo', 1, 4)
213
225
        # start > length
214
226
        self.assertRaises(ValueError,
215
 
            self.apply_delta_to_source, 'foo', 5, 3)
 
227
                          self.apply_delta_to_source, b'foo', 5, 3)
216
228
        # start > end
217
229
        self.assertRaises(ValueError,
218
 
            self.apply_delta_to_source, 'foo', 3, 2)
 
230
                          self.apply_delta_to_source, b'foo', 3, 2)
219
231
 
220
232
    def test_apply_delta_to_source(self):
221
233
        source_and_delta = (_text1
222
 
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
 
234
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
223
235
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
224
 
                                    len(_text1), len(source_and_delta)))
 
236
                                                            len(_text1), len(source_and_delta)))
225
237
 
226
238
 
227
239
class TestMakeAndApplyCompatible(tests.TestCase):
228
240
 
229
 
    make_delta = None # Set by load_tests
230
 
    apply_delta = None # Set by load_tests
 
241
    scenarios = two_way_scenarios()
 
242
 
 
243
    make_delta = None  # Set by load_tests
 
244
    apply_delta = None  # Set by load_tests
231
245
 
232
246
    def assertMakeAndApply(self, source, target):
233
247
        """Assert that generating a delta and applying gives success."""
255
269
        self._gc_module = compiled_groupcompress_feature.module
256
270
 
257
271
    def test_repr(self):
258
 
        di = self._gc_module.DeltaIndex('test text\n')
 
272
        di = self._gc_module.DeltaIndex(b'test text\n')
259
273
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
260
274
 
 
275
    def test_sizeof(self):
 
276
        di = self._gc_module.DeltaIndex()
 
277
        # Exact value will depend on platform but should include sources
 
278
        # source_info is a pointer and two longs so at least 12 bytes
 
279
        lower_bound = di._max_num_sources * 12
 
280
        self.assertGreater(sys.getsizeof(di), lower_bound)
 
281
 
 
282
    def test__dump_no_index(self):
 
283
        di = self._gc_module.DeltaIndex()
 
284
        self.assertEqual(None, di._dump_index())
 
285
 
 
286
    def test__dump_index_simple(self):
 
287
        di = self._gc_module.DeltaIndex()
 
288
        di.add_source(_text1, 0)
 
289
        self.assertFalse(di._has_index())
 
290
        self.assertEqual(None, di._dump_index())
 
291
        _ = di.make_delta(_text1)
 
292
        self.assertTrue(di._has_index())
 
293
        hash_list, entry_list = di._dump_index()
 
294
        self.assertEqual(16, len(hash_list))
 
295
        self.assertEqual(68, len(entry_list))
 
296
        just_entries = [(idx, text_offset, hash_val)
 
297
                        for idx, (text_offset, hash_val)
 
298
                        in enumerate(entry_list)
 
299
                        if text_offset != 0 or hash_val != 0]
 
300
        rabin_hash = self._gc_module._rabin_hash
 
301
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
302
                          (25, 48, rabin_hash(_text1[33:49])),
 
303
                          (34, 32, rabin_hash(_text1[17:33])),
 
304
                          (47, 64, rabin_hash(_text1[49:65])),
 
305
                          ], just_entries)
 
306
        # This ensures that the hash map points to the location we expect it to
 
307
        for entry_idx, text_offset, hash_val in just_entries:
 
308
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
309
 
 
310
    def test__dump_index_two_sources(self):
 
311
        di = self._gc_module.DeltaIndex()
 
312
        di.add_source(_text1, 0)
 
313
        di.add_source(_text2, 2)
 
314
        start2 = len(_text1) + 2
 
315
        self.assertTrue(di._has_index())
 
316
        hash_list, entry_list = di._dump_index()
 
317
        self.assertEqual(16, len(hash_list))
 
318
        self.assertEqual(68, len(entry_list))
 
319
        just_entries = [(idx, text_offset, hash_val)
 
320
                        for idx, (text_offset, hash_val)
 
321
                        in enumerate(entry_list)
 
322
                        if text_offset != 0 or hash_val != 0]
 
323
        rabin_hash = self._gc_module._rabin_hash
 
324
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
325
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
 
326
                          (25, 48, rabin_hash(_text1[33:49])),
 
327
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
 
328
                          (34, 32, rabin_hash(_text1[17:33])),
 
329
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
 
330
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
 
331
                          (47, 64, rabin_hash(_text1[49:65])),
 
332
                          ], just_entries)
 
333
        # Each entry should be in the appropriate hash bucket.
 
334
        for entry_idx, text_offset, hash_val in just_entries:
 
335
            hash_idx = hash_val & 0xf
 
336
            self.assertTrue(
 
337
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
 
338
 
261
339
    def test_first_add_source_doesnt_index_until_make_delta(self):
262
340
        di = self._gc_module.DeltaIndex()
263
341
        self.assertFalse(di._has_index())
267
345
        # generated, and will generate a proper delta
268
346
        delta = di.make_delta(_text2)
269
347
        self.assertTrue(di._has_index())
270
 
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
348
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
349
 
 
350
    def test_add_source_max_bytes_to_index(self):
 
351
        di = self._gc_module.DeltaIndex()
 
352
        di._max_bytes_to_index = 3 * 16
 
353
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
 
354
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
 
355
        start2 = len(_text1) + 3
 
356
        hash_list, entry_list = di._dump_index()
 
357
        self.assertEqual(16, len(hash_list))
 
358
        self.assertEqual(67, len(entry_list))
 
359
        just_entries = sorted([(text_offset, hash_val)
 
360
                               for text_offset, hash_val in entry_list
 
361
                               if text_offset != 0 or hash_val != 0])
 
362
        rabin_hash = self._gc_module._rabin_hash
 
363
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
364
                          (50, rabin_hash(_text1[35:51])),
 
365
                          (75, rabin_hash(_text1[60:76])),
 
366
                          (start2 + 44, rabin_hash(_text3[29:45])),
 
367
                          (start2 + 88, rabin_hash(_text3[73:89])),
 
368
                          (start2 + 132, rabin_hash(_text3[117:133])),
 
369
                          ], just_entries)
271
370
 
272
371
    def test_second_add_source_triggers_make_index(self):
273
372
        di = self._gc_module.DeltaIndex()
280
379
    def test_make_delta(self):
281
380
        di = self._gc_module.DeltaIndex(_text1)
282
381
        delta = di.make_delta(_text2)
283
 
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
382
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
284
383
 
285
384
    def test_delta_against_multiple_sources(self):
286
385
        di = self._gc_module.DeltaIndex()
292
391
        delta = di.make_delta(_third_text)
293
392
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
294
393
        self.assertEqualDiff(_third_text, result)
295
 
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
296
 
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
394
        self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
 
395
                         b'\x91v6\x03and\x91d"\x91:\n', delta)
297
396
 
298
397
    def test_delta_with_offsets(self):
299
398
        di = self._gc_module.DeltaIndex()
305
404
        delta = di.make_delta(_third_text)
306
405
        self.assertIsNot(None, delta)
307
406
        result = self._gc_module.apply_delta(
308
 
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
407
            b'12345' + _first_text + b'1234567890' + _second_text, delta)
309
408
        self.assertIsNot(None, result)
310
409
        self.assertEqualDiff(_third_text, result)
311
 
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
312
 
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
410
        self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
 
411
                         b'\x91\x856\x03and\x91s"\x91?\n', delta)
313
412
 
314
413
    def test_delta_with_delta_bytes(self):
315
414
        di = self._gc_module.DeltaIndex()
317
416
        di.add_source(_first_text, 0)
318
417
        self.assertEqual(len(_first_text), di._source_offset)
319
418
        delta = di.make_delta(_second_text)
320
 
        self.assertEqual('h\tsome more\x91\x019'
321
 
                         '&previous text\nand has some extra text\n', delta)
 
419
        self.assertEqual(b'h\tsome more\x91\x019'
 
420
                         b'&previous text\nand has some extra text\n', delta)
322
421
        di.add_delta_source(delta, 0)
323
422
        source += delta
324
423
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
330
429
        # Note that we don't match the 'common with the', because it isn't long
331
430
        # enough to match in the original text, and those bytes are not present
332
431
        # in the delta for the second text.
333
 
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
334
 
                         '\x91S&\x03and\x91\x18,', second_delta)
 
432
        self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
 
433
                         b'\x91S&\x03and\x91\x18,', second_delta)
335
434
        # Add this delta, and create a new delta for the same text. We should
336
435
        # find the remaining text, and only insert the short 'and' text.
337
436
        di.add_delta_source(second_delta, 0)
339
438
        third_delta = di.make_delta(_third_text)
340
439
        result = self._gc_module.apply_delta(source, third_delta)
341
440
        self.assertEqualDiff(_third_text, result)
342
 
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
343
 
                         '\x91S&\x03and\x91\x18,', third_delta)
 
441
        self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
 
442
                         b'\x91S&\x03and\x91\x18,', third_delta)
344
443
        # Now create a delta, which we know won't be able to be 'fit' into the
345
444
        # existing index
346
445
        fourth_delta = di.make_delta(_fourth_text)
347
446
        self.assertEqual(_fourth_text,
348
447
                         self._gc_module.apply_delta(source, fourth_delta))
349
 
        self.assertEqual('\x80\x01'
350
 
                         '\x7f123456789012345\nsame rabin hash\n'
351
 
                         '123456789012345\nsame rabin hash\n'
352
 
                         '123456789012345\nsame rabin hash\n'
353
 
                         '123456789012345\nsame rabin hash'
354
 
                         '\x01\n', fourth_delta)
 
448
        self.assertEqual(b'\x80\x01'
 
449
                         b'\x7f123456789012345\nsame rabin hash\n'
 
450
                         b'123456789012345\nsame rabin hash\n'
 
451
                         b'123456789012345\nsame rabin hash\n'
 
452
                         b'123456789012345\nsame rabin hash'
 
453
                         b'\x01\n', fourth_delta)
355
454
        di.add_delta_source(fourth_delta, 0)
356
455
        source += fourth_delta
357
456
        # With the next delta, everything should be found
358
457
        fifth_delta = di.make_delta(_fourth_text)
359
458
        self.assertEqual(_fourth_text,
360
459
                         self._gc_module.apply_delta(source, fifth_delta))
361
 
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
460
        self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
362
461
 
363
462
 
364
463
class TestCopyInstruction(tests.TestCase):
365
464
 
366
465
    def assertEncode(self, expected, offset, length):
367
 
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
368
 
        if expected != bytes:
369
 
            self.assertEqual([hex(ord(e)) for e in expected],
370
 
                             [hex(ord(b)) for b in bytes])
 
466
        data = _groupcompress_py.encode_copy_instruction(offset, length)
 
467
        self.assertEqual(expected, data)
371
468
 
372
 
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
373
 
        cmd = ord(bytes[pos])
 
469
    def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
 
470
        cmd = data[pos]
374
471
        pos += 1
375
 
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
 
472
        out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
376
473
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
377
474
 
378
475
    def test_encode_no_length(self):
379
 
        self.assertEncode('\x80', 0, 64*1024)
380
 
        self.assertEncode('\x81\x01', 1, 64*1024)
381
 
        self.assertEncode('\x81\x0a', 10, 64*1024)
382
 
        self.assertEncode('\x81\xff', 255, 64*1024)
383
 
        self.assertEncode('\x82\x01', 256, 64*1024)
384
 
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
385
 
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
386
 
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
387
 
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
388
 
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
389
 
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
390
 
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
476
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
477
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
 
478
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
479
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
 
480
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
 
481
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
 
482
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
 
483
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
 
484
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
 
485
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
 
486
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
 
487
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
391
488
 
392
489
    def test_encode_no_offset(self):
393
 
        self.assertEncode('\x90\x01', 0, 1)
394
 
        self.assertEncode('\x90\x0a', 0, 10)
395
 
        self.assertEncode('\x90\xff', 0, 255)
396
 
        self.assertEncode('\xA0\x01', 0, 256)
397
 
        self.assertEncode('\xB0\x01\x01', 0, 257)
398
 
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
 
490
        self.assertEncode(b'\x90\x01', 0, 1)
 
491
        self.assertEncode(b'\x90\x0a', 0, 10)
 
492
        self.assertEncode(b'\x90\xff', 0, 255)
 
493
        self.assertEncode(b'\xA0\x01', 0, 256)
 
494
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
495
        self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
399
496
        # Special case, if copy == 64KiB, then we store exactly 0
400
497
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
401
498
        # about that, as we would never actually copy 0 bytes
402
 
        self.assertEncode('\x80', 0, 64*1024)
 
499
        self.assertEncode(b'\x80', 0, 64 * 1024)
403
500
 
404
501
    def test_encode(self):
405
 
        self.assertEncode('\x91\x01\x01', 1, 1)
406
 
        self.assertEncode('\x91\x09\x0a', 9, 10)
407
 
        self.assertEncode('\x91\xfe\xff', 254, 255)
408
 
        self.assertEncode('\xA2\x02\x01', 512, 256)
409
 
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
410
 
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
502
        self.assertEncode(b'\x91\x01\x01', 1, 1)
 
503
        self.assertEncode(b'\x91\x09\x0a', 9, 10)
 
504
        self.assertEncode(b'\x91\xfe\xff', 254, 255)
 
505
        self.assertEncode(b'\xA2\x02\x01', 512, 256)
 
506
        self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
 
507
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
411
508
        # Special case, if copy == 64KiB, then we store exactly 0
412
509
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
413
510
        # about that, as we would never actually copy 0 bytes
414
 
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
511
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
415
512
 
416
513
    def test_decode_no_length(self):
417
514
        # If length is 0, it is interpreted as 64KiB
418
515
        # The shortest possible instruction is a copy of 64KiB from offset 0
419
 
        self.assertDecode(0, 65536, 1, '\x80', 0)
420
 
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
421
 
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
422
 
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
423
 
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
424
 
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
425
 
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
426
 
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
427
 
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
428
 
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
429
 
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
430
 
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
 
516
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
517
        self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
 
518
        self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
 
519
        self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
 
520
        self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
 
521
        self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
 
522
        self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
 
523
        self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
 
524
        self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
 
525
        self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
 
526
        self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
 
527
        self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
431
528
 
432
529
    def test_decode_no_offset(self):
433
 
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
434
 
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
435
 
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
436
 
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
437
 
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
438
 
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
 
530
        self.assertDecode(0, 1, 2, b'\x90\x01', 0)
 
531
        self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
 
532
        self.assertDecode(0, 255, 2, b'\x90\xff', 0)
 
533
        self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
 
534
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
535
        self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
439
536
        # Special case, if copy == 64KiB, then we store exactly 0
440
537
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
441
538
        # about that, as we would never actually copy 0 bytes
442
 
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
539
        self.assertDecode(0, 65536, 1, b'\x80', 0)
443
540
 
444
541
    def test_decode(self):
445
 
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
446
 
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
447
 
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
448
 
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
449
 
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
450
 
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
542
        self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
 
543
        self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
 
544
        self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
 
545
        self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
 
546
        self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
 
547
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
451
548
 
452
549
    def test_decode_not_start(self):
453
 
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
454
 
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
455
 
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
 
550
        self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
 
551
        self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
 
552
        self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
456
553
 
457
554
 
458
555
class TestBase128Int(tests.TestCase):
459
556
 
460
 
    _gc_module = None # Set by load_tests
 
557
    scenarios = module_scenarios()
 
558
 
 
559
    _gc_module = None  # Set by load_tests
461
560
 
462
561
    def assertEqualEncode(self, bytes, val):
463
562
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
467
566
                         self._gc_module.decode_base128_int(bytes))
468
567
 
469
568
    def test_encode(self):
470
 
        self.assertEqualEncode('\x01', 1)
471
 
        self.assertEqualEncode('\x02', 2)
472
 
        self.assertEqualEncode('\x7f', 127)
473
 
        self.assertEqualEncode('\x80\x01', 128)
474
 
        self.assertEqualEncode('\xff\x01', 255)
475
 
        self.assertEqualEncode('\x80\x02', 256)
476
 
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
569
        self.assertEqualEncode(b'\x01', 1)
 
570
        self.assertEqualEncode(b'\x02', 2)
 
571
        self.assertEqualEncode(b'\x7f', 127)
 
572
        self.assertEqualEncode(b'\x80\x01', 128)
 
573
        self.assertEqualEncode(b'\xff\x01', 255)
 
574
        self.assertEqualEncode(b'\x80\x02', 256)
 
575
        self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
477
576
 
478
577
    def test_decode(self):
479
 
        self.assertEqualDecode(1, 1, '\x01')
480
 
        self.assertEqualDecode(2, 1, '\x02')
481
 
        self.assertEqualDecode(127, 1, '\x7f')
482
 
        self.assertEqualDecode(128, 2, '\x80\x01')
483
 
        self.assertEqualDecode(255, 2, '\xff\x01')
484
 
        self.assertEqualDecode(256, 2, '\x80\x02')
485
 
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
578
        self.assertEqualDecode(1, 1, b'\x01')
 
579
        self.assertEqualDecode(2, 1, b'\x02')
 
580
        self.assertEqualDecode(127, 1, b'\x7f')
 
581
        self.assertEqualDecode(128, 2, b'\x80\x01')
 
582
        self.assertEqualDecode(255, 2, b'\xff\x01')
 
583
        self.assertEqualDecode(256, 2, b'\x80\x02')
 
584
        self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
486
585
 
487
586
    def test_decode_with_trailing_bytes(self):
488
 
        self.assertEqualDecode(1, 1, '\x01abcdef')
489
 
        self.assertEqualDecode(127, 1, '\x7f\x01')
490
 
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
491
 
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
492
 
 
493
 
 
 
587
        self.assertEqualDecode(1, 1, b'\x01abcdef')
 
588
        self.assertEqualDecode(127, 1, b'\x7f\x01')
 
589
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
 
590
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')