/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2020-04-05 19:11:34 UTC
  • mto: (7490.7.16 work)
  • mto: This revision was merged to the branch mainline in revision 7501.
  • Revision ID: jelmer@jelmer.uk-20200405191134-0aebh8ikiwygxma5
Populate the .gitignore file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
 
from bzrlib import (
20
 
    groupcompress,
 
19
import sys
 
20
 
 
21
from .. import (
 
22
    tests,
 
23
    )
 
24
from ..bzr import (
21
25
    _groupcompress_py,
22
 
    tests,
23
 
    )
24
 
 
25
 
 
26
 
def load_tests(standard_tests, module, loader):
27
 
    """Parameterize tests for all versions of groupcompress."""
28
 
    two_way_scenarios = [
 
26
    )
 
27
from .scenarios import (
 
28
    load_tests_apply_scenarios,
 
29
    )
 
30
from ..sixish import (
 
31
    indexbytes,
 
32
    )
 
33
from . import (
 
34
    features,
 
35
    )
 
36
 
 
37
 
 
38
def module_scenarios():
 
39
    scenarios = [
 
40
        ('python', {'_gc_module': _groupcompress_py}),
 
41
        ]
 
42
    if compiled_groupcompress_feature.available():
 
43
        gc_module = compiled_groupcompress_feature.module
 
44
        scenarios.append(('C',
 
45
                          {'_gc_module': gc_module}))
 
46
    return scenarios
 
47
 
 
48
 
 
49
def two_way_scenarios():
 
50
    scenarios = [
29
51
        ('PP', {'make_delta': _groupcompress_py.make_delta,
30
52
                'apply_delta': _groupcompress_py.apply_delta})
31
53
        ]
32
 
    scenarios = [
33
 
        ('python', {'_gc_module': _groupcompress_py}),
34
 
        ]
35
54
    if compiled_groupcompress_feature.available():
36
55
        gc_module = compiled_groupcompress_feature.module
37
 
        scenarios.append(('C',
38
 
            {'_gc_module': gc_module}))
39
 
        two_way_scenarios.extend([
 
56
        scenarios.extend([
40
57
            ('CC', {'make_delta': gc_module.make_delta,
41
58
                    'apply_delta': gc_module.apply_delta}),
42
59
            ('PC', {'make_delta': _groupcompress_py.make_delta,
44
61
            ('CP', {'make_delta': gc_module.make_delta,
45
62
                    'apply_delta': _groupcompress_py.apply_delta}),
46
63
            ])
47
 
    to_adapt, result = tests.split_suite_by_condition(
48
 
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
49
 
                                                    TestBase128Int)))
50
 
    result = tests.multiply_tests(to_adapt, scenarios, result)
51
 
    to_adapt, result = tests.split_suite_by_condition(result,
52
 
        tests.condition_isinstance(TestMakeAndApplyCompatible))
53
 
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
54
 
    return result
55
 
 
56
 
 
57
 
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
58
 
                                    'bzrlib._groupcompress_pyx')
59
 
 
60
 
_text1 = """\
 
64
    return scenarios
 
65
 
 
66
 
 
67
load_tests = load_tests_apply_scenarios
 
68
 
 
69
 
 
70
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
71
    'breezy.bzr._groupcompress_pyx')
 
72
 
 
73
_text1 = b"""\
61
74
This is a bit
62
75
of source text
63
76
which is meant to be matched
64
77
against other text
65
78
"""
66
79
 
67
 
_text2 = """\
 
80
_text2 = b"""\
68
81
This is a bit
69
82
of source text
70
83
which is meant to differ from
71
84
against other text
72
85
"""
73
86
 
74
 
_text3 = """\
 
87
_text3 = b"""\
75
88
This is a bit
76
89
of source text
77
90
which is meant to be matched
81
94
at the end of the file
82
95
"""
83
96
 
84
 
_first_text = """\
 
97
_first_text = b"""\
85
98
a bit of text, that
86
99
does not have much in
87
100
common with the next text
88
101
"""
89
102
 
90
 
_second_text = """\
 
103
_second_text = b"""\
91
104
some more bit of text, that
92
105
does not have much in
93
106
common with the previous text
95
108
"""
96
109
 
97
110
 
98
 
_third_text = """\
 
111
_third_text = b"""\
99
112
a bit of text, that
100
113
has some in common with the previous text
101
114
and has some extra text
103
116
common with the next text
104
117
"""
105
118
 
106
 
_fourth_text = """\
 
119
_fourth_text = b"""\
107
120
123456789012345
108
121
same rabin hash
109
122
123456789012345
114
127
same rabin hash
115
128
"""
116
129
 
 
130
 
117
131
class TestMakeAndApplyDelta(tests.TestCase):
118
132
 
119
 
    _gc_module = None # Set by load_tests
 
133
    scenarios = module_scenarios()
 
134
    _gc_module = None  # Set by load_tests
120
135
 
121
136
    def setUp(self):
122
137
        super(TestMakeAndApplyDelta, self).setUp()
125
140
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
126
141
 
127
142
    def test_make_delta_is_typesafe(self):
128
 
        self.make_delta('a string', 'another string')
 
143
        self.make_delta(b'a string', b'another string')
129
144
 
130
145
        def _check_make_delta(string1, string2):
131
146
            self.assertRaises(TypeError, self.make_delta, string1, string2)
132
147
 
133
 
        _check_make_delta('a string', object())
134
 
        _check_make_delta('a string', u'not a string')
135
 
        _check_make_delta(object(), 'a string')
136
 
        _check_make_delta(u'not a string', 'a string')
 
148
        _check_make_delta(b'a string', object())
 
149
        _check_make_delta(b'a string', u'not a string')
 
150
        _check_make_delta(object(), b'a string')
 
151
        _check_make_delta(u'not a string', b'a string')
137
152
 
138
153
    def test_make_noop_delta(self):
139
154
        ident_delta = self.make_delta(_text1, _text1)
140
 
        self.assertEqual('M\x90M', ident_delta)
 
155
        self.assertEqual(b'M\x90M', ident_delta)
141
156
        ident_delta = self.make_delta(_text2, _text2)
142
 
        self.assertEqual('N\x90N', ident_delta)
 
157
        self.assertEqual(b'N\x90N', ident_delta)
143
158
        ident_delta = self.make_delta(_text3, _text3)
144
 
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
 
159
        self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
145
160
 
146
161
    def assertDeltaIn(self, delta1, delta2, delta):
147
162
        """Make sure that the delta bytes match one of the expectations."""
148
163
        # In general, the python delta matcher gives different results than the
149
164
        # pyrex delta matcher. Both should be valid deltas, though.
150
165
        if delta not in (delta1, delta2):
151
 
            self.fail("Delta bytes:\n"
152
 
                      "       %r\n"
153
 
                      "not in %r\n"
154
 
                      "    or %r"
 
166
            self.fail(b"Delta bytes:\n"
 
167
                      b"       %r\n"
 
168
                      b"not in %r\n"
 
169
                      b"    or %r"
155
170
                      % (delta, delta1, delta2))
156
171
 
157
172
    def test_make_delta(self):
158
173
        delta = self.make_delta(_text1, _text2)
159
174
        self.assertDeltaIn(
160
 
            'N\x90/\x1fdiffer from\nagainst other text\n',
161
 
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
175
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
176
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
162
177
            delta)
163
178
        delta = self.make_delta(_text2, _text1)
164
179
        self.assertDeltaIn(
165
 
            'M\x90/\x1ebe matched\nagainst other text\n',
166
 
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
180
            b'M\x90/\x1ebe matched\nagainst other text\n',
 
181
            b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
167
182
            delta)
168
183
        delta = self.make_delta(_text3, _text1)
169
 
        self.assertEqual('M\x90M', delta)
 
184
        self.assertEqual(b'M\x90M', delta)
170
185
        delta = self.make_delta(_text3, _text2)
171
186
        self.assertDeltaIn(
172
 
            'N\x90/\x1fdiffer from\nagainst other text\n',
173
 
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
187
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
188
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
174
189
            delta)
175
190
 
176
191
    def test_make_delta_with_large_copies(self):
179
194
        big_text = _text3 * 1220
180
195
        delta = self.make_delta(big_text, big_text)
181
196
        self.assertDeltaIn(
182
 
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
183
 
            '\x80'              # Copy 64kB, starting at byte 0
184
 
            '\x84\x01'          # and another 64kB starting at 64kB
185
 
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
197
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
198
            b'\x80'              # Copy 64kB, starting at byte 0
 
199
            b'\x84\x01'          # and another 64kB starting at 64kB
 
200
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
186
201
            None,   # Both implementations should be identical
187
202
            delta)
188
203
 
189
204
    def test_apply_delta_is_typesafe(self):
190
 
        self.apply_delta(_text1, 'M\x90M')
191
 
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
 
205
        self.apply_delta(_text1, b'M\x90M')
 
206
        self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
192
207
        self.assertRaises(TypeError, self.apply_delta,
193
 
                          unicode(_text1), 'M\x90M')
 
208
                          _text1.decode('latin1'), b'M\x90M')
194
209
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
195
210
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
196
211
 
197
212
    def test_apply_delta(self):
198
213
        target = self.apply_delta(_text1,
199
 
                    'N\x90/\x1fdiffer from\nagainst other text\n')
 
214
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
200
215
        self.assertEqual(_text2, target)
201
216
        target = self.apply_delta(_text2,
202
 
                    'M\x90/\x1ebe matched\nagainst other text\n')
 
217
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
203
218
        self.assertEqual(_text1, target)
204
219
 
205
220
    def test_apply_delta_to_source_is_safe(self):
206
221
        self.assertRaises(TypeError,
207
 
            self.apply_delta_to_source, object(), 0, 1)
 
222
                          self.apply_delta_to_source, object(), 0, 1)
208
223
        self.assertRaises(TypeError,
209
 
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
224
                          self.apply_delta_to_source, u'unicode str', 0, 1)
210
225
        # end > length
211
226
        self.assertRaises(ValueError,
212
 
            self.apply_delta_to_source, 'foo', 1, 4)
 
227
                          self.apply_delta_to_source, b'foo', 1, 4)
213
228
        # start > length
214
229
        self.assertRaises(ValueError,
215
 
            self.apply_delta_to_source, 'foo', 5, 3)
 
230
                          self.apply_delta_to_source, b'foo', 5, 3)
216
231
        # start > end
217
232
        self.assertRaises(ValueError,
218
 
            self.apply_delta_to_source, 'foo', 3, 2)
 
233
                          self.apply_delta_to_source, b'foo', 3, 2)
219
234
 
220
235
    def test_apply_delta_to_source(self):
221
236
        source_and_delta = (_text1
222
 
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
 
237
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
223
238
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
224
 
                                    len(_text1), len(source_and_delta)))
 
239
                                                            len(_text1), len(source_and_delta)))
225
240
 
226
241
 
227
242
class TestMakeAndApplyCompatible(tests.TestCase):
228
243
 
229
 
    make_delta = None # Set by load_tests
230
 
    apply_delta = None # Set by load_tests
 
244
    scenarios = two_way_scenarios()
 
245
 
 
246
    make_delta = None  # Set by load_tests
 
247
    apply_delta = None  # Set by load_tests
231
248
 
232
249
    def assertMakeAndApply(self, source, target):
233
250
        """Assert that generating a delta and applying gives success."""
255
272
        self._gc_module = compiled_groupcompress_feature.module
256
273
 
257
274
    def test_repr(self):
258
 
        di = self._gc_module.DeltaIndex('test text\n')
 
275
        di = self._gc_module.DeltaIndex(b'test text\n')
259
276
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
260
277
 
 
278
    def test_sizeof(self):
 
279
        di = self._gc_module.DeltaIndex()
 
280
        # Exact value will depend on platform but should include sources
 
281
        # source_info is a pointer and two longs so at least 12 bytes
 
282
        lower_bound = di._max_num_sources * 12
 
283
        self.assertGreater(sys.getsizeof(di), lower_bound)
 
284
 
 
285
    def test__dump_no_index(self):
 
286
        di = self._gc_module.DeltaIndex()
 
287
        self.assertEqual(None, di._dump_index())
 
288
 
 
289
    def test__dump_index_simple(self):
 
290
        di = self._gc_module.DeltaIndex()
 
291
        di.add_source(_text1, 0)
 
292
        self.assertFalse(di._has_index())
 
293
        self.assertEqual(None, di._dump_index())
 
294
        _ = di.make_delta(_text1)
 
295
        self.assertTrue(di._has_index())
 
296
        hash_list, entry_list = di._dump_index()
 
297
        self.assertEqual(16, len(hash_list))
 
298
        self.assertEqual(68, len(entry_list))
 
299
        just_entries = [(idx, text_offset, hash_val)
 
300
                        for idx, (text_offset, hash_val)
 
301
                        in enumerate(entry_list)
 
302
                        if text_offset != 0 or hash_val != 0]
 
303
        rabin_hash = self._gc_module._rabin_hash
 
304
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
305
                          (25, 48, rabin_hash(_text1[33:49])),
 
306
                          (34, 32, rabin_hash(_text1[17:33])),
 
307
                          (47, 64, rabin_hash(_text1[49:65])),
 
308
                          ], just_entries)
 
309
        # This ensures that the hash map points to the location we expect it to
 
310
        for entry_idx, text_offset, hash_val in just_entries:
 
311
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
312
 
 
313
    def test__dump_index_two_sources(self):
 
314
        di = self._gc_module.DeltaIndex()
 
315
        di.add_source(_text1, 0)
 
316
        di.add_source(_text2, 2)
 
317
        start2 = len(_text1) + 2
 
318
        self.assertTrue(di._has_index())
 
319
        hash_list, entry_list = di._dump_index()
 
320
        self.assertEqual(16, len(hash_list))
 
321
        self.assertEqual(68, len(entry_list))
 
322
        just_entries = [(idx, text_offset, hash_val)
 
323
                        for idx, (text_offset, hash_val)
 
324
                        in enumerate(entry_list)
 
325
                        if text_offset != 0 or hash_val != 0]
 
326
        rabin_hash = self._gc_module._rabin_hash
 
327
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
328
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
 
329
                          (25, 48, rabin_hash(_text1[33:49])),
 
330
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
 
331
                          (34, 32, rabin_hash(_text1[17:33])),
 
332
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
 
333
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
 
334
                          (47, 64, rabin_hash(_text1[49:65])),
 
335
                          ], just_entries)
 
336
        # Each entry should be in the appropriate hash bucket.
 
337
        for entry_idx, text_offset, hash_val in just_entries:
 
338
            hash_idx = hash_val & 0xf
 
339
            self.assertTrue(
 
340
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
 
341
 
261
342
    def test_first_add_source_doesnt_index_until_make_delta(self):
262
343
        di = self._gc_module.DeltaIndex()
263
344
        self.assertFalse(di._has_index())
267
348
        # generated, and will generate a proper delta
268
349
        delta = di.make_delta(_text2)
269
350
        self.assertTrue(di._has_index())
270
 
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
351
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
352
 
 
353
    def test_add_source_max_bytes_to_index(self):
 
354
        di = self._gc_module.DeltaIndex()
 
355
        di._max_bytes_to_index = 3 * 16
 
356
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
 
357
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
 
358
        start2 = len(_text1) + 3
 
359
        hash_list, entry_list = di._dump_index()
 
360
        self.assertEqual(16, len(hash_list))
 
361
        self.assertEqual(67, len(entry_list))
 
362
        just_entries = sorted([(text_offset, hash_val)
 
363
                               for text_offset, hash_val in entry_list
 
364
                               if text_offset != 0 or hash_val != 0])
 
365
        rabin_hash = self._gc_module._rabin_hash
 
366
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
367
                          (50, rabin_hash(_text1[35:51])),
 
368
                          (75, rabin_hash(_text1[60:76])),
 
369
                          (start2 + 44, rabin_hash(_text3[29:45])),
 
370
                          (start2 + 88, rabin_hash(_text3[73:89])),
 
371
                          (start2 + 132, rabin_hash(_text3[117:133])),
 
372
                          ], just_entries)
271
373
 
272
374
    def test_second_add_source_triggers_make_index(self):
273
375
        di = self._gc_module.DeltaIndex()
280
382
    def test_make_delta(self):
281
383
        di = self._gc_module.DeltaIndex(_text1)
282
384
        delta = di.make_delta(_text2)
283
 
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
385
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
284
386
 
285
387
    def test_delta_against_multiple_sources(self):
286
388
        di = self._gc_module.DeltaIndex()
292
394
        delta = di.make_delta(_third_text)
293
395
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
294
396
        self.assertEqualDiff(_third_text, result)
295
 
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
296
 
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
397
        self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
 
398
                         b'\x91v6\x03and\x91d"\x91:\n', delta)
297
399
 
298
400
    def test_delta_with_offsets(self):
299
401
        di = self._gc_module.DeltaIndex()
305
407
        delta = di.make_delta(_third_text)
306
408
        self.assertIsNot(None, delta)
307
409
        result = self._gc_module.apply_delta(
308
 
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
410
            b'12345' + _first_text + b'1234567890' + _second_text, delta)
309
411
        self.assertIsNot(None, result)
310
412
        self.assertEqualDiff(_third_text, result)
311
 
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
312
 
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
413
        self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
 
414
                         b'\x91\x856\x03and\x91s"\x91?\n', delta)
313
415
 
314
416
    def test_delta_with_delta_bytes(self):
315
417
        di = self._gc_module.DeltaIndex()
317
419
        di.add_source(_first_text, 0)
318
420
        self.assertEqual(len(_first_text), di._source_offset)
319
421
        delta = di.make_delta(_second_text)
320
 
        self.assertEqual('h\tsome more\x91\x019'
321
 
                         '&previous text\nand has some extra text\n', delta)
 
422
        self.assertEqual(b'h\tsome more\x91\x019'
 
423
                         b'&previous text\nand has some extra text\n', delta)
322
424
        di.add_delta_source(delta, 0)
323
425
        source += delta
324
426
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
330
432
        # Note that we don't match the 'common with the', because it isn't long
331
433
        # enough to match in the original text, and those bytes are not present
332
434
        # in the delta for the second text.
333
 
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
334
 
                         '\x91S&\x03and\x91\x18,', second_delta)
 
435
        self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
 
436
                         b'\x91S&\x03and\x91\x18,', second_delta)
335
437
        # Add this delta, and create a new delta for the same text. We should
336
438
        # find the remaining text, and only insert the short 'and' text.
337
439
        di.add_delta_source(second_delta, 0)
339
441
        third_delta = di.make_delta(_third_text)
340
442
        result = self._gc_module.apply_delta(source, third_delta)
341
443
        self.assertEqualDiff(_third_text, result)
342
 
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
343
 
                         '\x91S&\x03and\x91\x18,', third_delta)
 
444
        self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
 
445
                         b'\x91S&\x03and\x91\x18,', third_delta)
344
446
        # Now create a delta, which we know won't be able to be 'fit' into the
345
447
        # existing index
346
448
        fourth_delta = di.make_delta(_fourth_text)
347
449
        self.assertEqual(_fourth_text,
348
450
                         self._gc_module.apply_delta(source, fourth_delta))
349
 
        self.assertEqual('\x80\x01'
350
 
                         '\x7f123456789012345\nsame rabin hash\n'
351
 
                         '123456789012345\nsame rabin hash\n'
352
 
                         '123456789012345\nsame rabin hash\n'
353
 
                         '123456789012345\nsame rabin hash'
354
 
                         '\x01\n', fourth_delta)
 
451
        self.assertEqual(b'\x80\x01'
 
452
                         b'\x7f123456789012345\nsame rabin hash\n'
 
453
                         b'123456789012345\nsame rabin hash\n'
 
454
                         b'123456789012345\nsame rabin hash\n'
 
455
                         b'123456789012345\nsame rabin hash'
 
456
                         b'\x01\n', fourth_delta)
355
457
        di.add_delta_source(fourth_delta, 0)
356
458
        source += fourth_delta
357
459
        # With the next delta, everything should be found
358
460
        fifth_delta = di.make_delta(_fourth_text)
359
461
        self.assertEqual(_fourth_text,
360
462
                         self._gc_module.apply_delta(source, fifth_delta))
361
 
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
463
        self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
362
464
 
363
465
 
364
466
class TestCopyInstruction(tests.TestCase):
365
467
 
366
468
    def assertEncode(self, expected, offset, length):
367
 
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
368
 
        if expected != bytes:
369
 
            self.assertEqual([hex(ord(e)) for e in expected],
370
 
                             [hex(ord(b)) for b in bytes])
 
469
        data = _groupcompress_py.encode_copy_instruction(offset, length)
 
470
        self.assertEqual(expected, data)
371
471
 
372
 
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
373
 
        cmd = ord(bytes[pos])
 
472
    def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
 
473
        cmd = indexbytes(data, pos)
374
474
        pos += 1
375
 
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
 
475
        out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
376
476
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
377
477
 
378
478
    def test_encode_no_length(self):
379
 
        self.assertEncode('\x80', 0, 64*1024)
380
 
        self.assertEncode('\x81\x01', 1, 64*1024)
381
 
        self.assertEncode('\x81\x0a', 10, 64*1024)
382
 
        self.assertEncode('\x81\xff', 255, 64*1024)
383
 
        self.assertEncode('\x82\x01', 256, 64*1024)
384
 
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
385
 
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
386
 
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
387
 
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
388
 
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
389
 
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
390
 
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
479
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
480
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
 
481
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
482
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
 
483
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
 
484
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
 
485
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
 
486
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
 
487
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
 
488
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
 
489
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
 
490
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
391
491
 
392
492
    def test_encode_no_offset(self):
393
 
        self.assertEncode('\x90\x01', 0, 1)
394
 
        self.assertEncode('\x90\x0a', 0, 10)
395
 
        self.assertEncode('\x90\xff', 0, 255)
396
 
        self.assertEncode('\xA0\x01', 0, 256)
397
 
        self.assertEncode('\xB0\x01\x01', 0, 257)
398
 
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
 
493
        self.assertEncode(b'\x90\x01', 0, 1)
 
494
        self.assertEncode(b'\x90\x0a', 0, 10)
 
495
        self.assertEncode(b'\x90\xff', 0, 255)
 
496
        self.assertEncode(b'\xA0\x01', 0, 256)
 
497
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
498
        self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
399
499
        # Special case, if copy == 64KiB, then we store exactly 0
400
500
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
401
501
        # about that, as we would never actually copy 0 bytes
402
 
        self.assertEncode('\x80', 0, 64*1024)
 
502
        self.assertEncode(b'\x80', 0, 64 * 1024)
403
503
 
404
504
    def test_encode(self):
405
 
        self.assertEncode('\x91\x01\x01', 1, 1)
406
 
        self.assertEncode('\x91\x09\x0a', 9, 10)
407
 
        self.assertEncode('\x91\xfe\xff', 254, 255)
408
 
        self.assertEncode('\xA2\x02\x01', 512, 256)
409
 
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
410
 
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
505
        self.assertEncode(b'\x91\x01\x01', 1, 1)
 
506
        self.assertEncode(b'\x91\x09\x0a', 9, 10)
 
507
        self.assertEncode(b'\x91\xfe\xff', 254, 255)
 
508
        self.assertEncode(b'\xA2\x02\x01', 512, 256)
 
509
        self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
 
510
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
411
511
        # Special case, if copy == 64KiB, then we store exactly 0
412
512
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
413
513
        # about that, as we would never actually copy 0 bytes
414
 
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
514
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
415
515
 
416
516
    def test_decode_no_length(self):
417
517
        # If length is 0, it is interpreted as 64KiB
418
518
        # The shortest possible instruction is a copy of 64KiB from offset 0
419
 
        self.assertDecode(0, 65536, 1, '\x80', 0)
420
 
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
421
 
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
422
 
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
423
 
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
424
 
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
425
 
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
426
 
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
427
 
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
428
 
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
429
 
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
430
 
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
 
519
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
520
        self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
 
521
        self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
 
522
        self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
 
523
        self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
 
524
        self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
 
525
        self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
 
526
        self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
 
527
        self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
 
528
        self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
 
529
        self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
 
530
        self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
431
531
 
432
532
    def test_decode_no_offset(self):
433
 
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
434
 
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
435
 
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
436
 
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
437
 
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
438
 
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
 
533
        self.assertDecode(0, 1, 2, b'\x90\x01', 0)
 
534
        self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
 
535
        self.assertDecode(0, 255, 2, b'\x90\xff', 0)
 
536
        self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
 
537
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
538
        self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
439
539
        # Special case, if copy == 64KiB, then we store exactly 0
440
540
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
441
541
        # about that, as we would never actually copy 0 bytes
442
 
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
542
        self.assertDecode(0, 65536, 1, b'\x80', 0)
443
543
 
444
544
    def test_decode(self):
445
 
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
446
 
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
447
 
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
448
 
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
449
 
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
450
 
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
545
        self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
 
546
        self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
 
547
        self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
 
548
        self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
 
549
        self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
 
550
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
451
551
 
452
552
    def test_decode_not_start(self):
453
 
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
454
 
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
455
 
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
 
553
        self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
 
554
        self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
 
555
        self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
456
556
 
457
557
 
458
558
class TestBase128Int(tests.TestCase):
459
559
 
460
 
    _gc_module = None # Set by load_tests
 
560
    scenarios = module_scenarios()
 
561
 
 
562
    _gc_module = None  # Set by load_tests
461
563
 
462
564
    def assertEqualEncode(self, bytes, val):
463
565
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
467
569
                         self._gc_module.decode_base128_int(bytes))
468
570
 
469
571
    def test_encode(self):
470
 
        self.assertEqualEncode('\x01', 1)
471
 
        self.assertEqualEncode('\x02', 2)
472
 
        self.assertEqualEncode('\x7f', 127)
473
 
        self.assertEqualEncode('\x80\x01', 128)
474
 
        self.assertEqualEncode('\xff\x01', 255)
475
 
        self.assertEqualEncode('\x80\x02', 256)
476
 
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
572
        self.assertEqualEncode(b'\x01', 1)
 
573
        self.assertEqualEncode(b'\x02', 2)
 
574
        self.assertEqualEncode(b'\x7f', 127)
 
575
        self.assertEqualEncode(b'\x80\x01', 128)
 
576
        self.assertEqualEncode(b'\xff\x01', 255)
 
577
        self.assertEqualEncode(b'\x80\x02', 256)
 
578
        self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
477
579
 
478
580
    def test_decode(self):
479
 
        self.assertEqualDecode(1, 1, '\x01')
480
 
        self.assertEqualDecode(2, 1, '\x02')
481
 
        self.assertEqualDecode(127, 1, '\x7f')
482
 
        self.assertEqualDecode(128, 2, '\x80\x01')
483
 
        self.assertEqualDecode(255, 2, '\xff\x01')
484
 
        self.assertEqualDecode(256, 2, '\x80\x02')
485
 
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
581
        self.assertEqualDecode(1, 1, b'\x01')
 
582
        self.assertEqualDecode(2, 1, b'\x02')
 
583
        self.assertEqualDecode(127, 1, b'\x7f')
 
584
        self.assertEqualDecode(128, 2, b'\x80\x01')
 
585
        self.assertEqualDecode(255, 2, b'\xff\x01')
 
586
        self.assertEqualDecode(256, 2, b'\x80\x02')
 
587
        self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
486
588
 
487
589
    def test_decode_with_trailing_bytes(self):
488
 
        self.assertEqualDecode(1, 1, '\x01abcdef')
489
 
        self.assertEqualDecode(127, 1, '\x7f\x01')
490
 
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
491
 
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
492
 
 
493
 
 
 
590
        self.assertEqualDecode(1, 1, b'\x01abcdef')
 
591
        self.assertEqualDecode(127, 1, b'\x7f\x01')
 
592
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
 
593
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')