/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-24 00:39:50 UTC
  • mto: This revision was merged to the branch mainline in revision 7504.
  • Revision ID: jelmer@jelmer.uk-20200524003950-bbc545r76vc5yajg
Add github action.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
import sys
 
20
 
 
21
from .. import (
 
22
    tests,
 
23
    )
 
24
from ..bzr import (
 
25
    _groupcompress_py,
 
26
    )
 
27
from .scenarios import (
 
28
    load_tests_apply_scenarios,
 
29
    )
 
30
from . import (
 
31
    features,
 
32
    )
 
33
 
 
34
 
 
35
def module_scenarios():
 
36
    scenarios = [
 
37
        ('python', {'_gc_module': _groupcompress_py}),
 
38
        ]
 
39
    if compiled_groupcompress_feature.available():
 
40
        gc_module = compiled_groupcompress_feature.module
 
41
        scenarios.append(('C',
 
42
                          {'_gc_module': gc_module}))
 
43
    return scenarios
 
44
 
 
45
 
 
46
def two_way_scenarios():
 
47
    scenarios = [
 
48
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
49
                'apply_delta': _groupcompress_py.apply_delta})
 
50
        ]
 
51
    if compiled_groupcompress_feature.available():
 
52
        gc_module = compiled_groupcompress_feature.module
 
53
        scenarios.extend([
 
54
            ('CC', {'make_delta': gc_module.make_delta,
 
55
                    'apply_delta': gc_module.apply_delta}),
 
56
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
57
                    'apply_delta': gc_module.apply_delta}),
 
58
            ('CP', {'make_delta': gc_module.make_delta,
 
59
                    'apply_delta': _groupcompress_py.apply_delta}),
 
60
            ])
 
61
    return scenarios
 
62
 
 
63
 
 
64
load_tests = load_tests_apply_scenarios
 
65
 
 
66
 
 
67
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
68
    'breezy.bzr._groupcompress_pyx')
 
69
 
 
70
_text1 = b"""\
 
71
This is a bit
 
72
of source text
 
73
which is meant to be matched
 
74
against other text
 
75
"""
 
76
 
 
77
_text2 = b"""\
 
78
This is a bit
 
79
of source text
 
80
which is meant to differ from
 
81
against other text
 
82
"""
 
83
 
 
84
_text3 = b"""\
 
85
This is a bit
 
86
of source text
 
87
which is meant to be matched
 
88
against other text
 
89
except it also
 
90
has a lot more data
 
91
at the end of the file
 
92
"""
 
93
 
 
94
_first_text = b"""\
 
95
a bit of text, that
 
96
does not have much in
 
97
common with the next text
 
98
"""
 
99
 
 
100
_second_text = b"""\
 
101
some more bit of text, that
 
102
does not have much in
 
103
common with the previous text
 
104
and has some extra text
 
105
"""
 
106
 
 
107
 
 
108
_third_text = b"""\
 
109
a bit of text, that
 
110
has some in common with the previous text
 
111
and has some extra text
 
112
and not have much in
 
113
common with the next text
 
114
"""
 
115
 
 
116
_fourth_text = b"""\
 
117
123456789012345
 
118
same rabin hash
 
119
123456789012345
 
120
same rabin hash
 
121
123456789012345
 
122
same rabin hash
 
123
123456789012345
 
124
same rabin hash
 
125
"""
 
126
 
 
127
 
 
128
class TestMakeAndApplyDelta(tests.TestCase):
 
129
 
 
130
    scenarios = module_scenarios()
 
131
    _gc_module = None  # Set by load_tests
 
132
 
 
133
    def setUp(self):
 
134
        super(TestMakeAndApplyDelta, self).setUp()
 
135
        self.make_delta = self._gc_module.make_delta
 
136
        self.apply_delta = self._gc_module.apply_delta
 
137
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
138
 
 
139
    def test_make_delta_is_typesafe(self):
 
140
        self.make_delta(b'a string', b'another string')
 
141
 
 
142
        def _check_make_delta(string1, string2):
 
143
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
144
 
 
145
        _check_make_delta(b'a string', object())
 
146
        _check_make_delta(b'a string', u'not a string')
 
147
        _check_make_delta(object(), b'a string')
 
148
        _check_make_delta(u'not a string', b'a string')
 
149
 
 
150
    def test_make_noop_delta(self):
 
151
        ident_delta = self.make_delta(_text1, _text1)
 
152
        self.assertEqual(b'M\x90M', ident_delta)
 
153
        ident_delta = self.make_delta(_text2, _text2)
 
154
        self.assertEqual(b'N\x90N', ident_delta)
 
155
        ident_delta = self.make_delta(_text3, _text3)
 
156
        self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
 
157
 
 
158
    def assertDeltaIn(self, delta1, delta2, delta):
 
159
        """Make sure that the delta bytes match one of the expectations."""
 
160
        # In general, the python delta matcher gives different results than the
 
161
        # pyrex delta matcher. Both should be valid deltas, though.
 
162
        if delta not in (delta1, delta2):
 
163
            self.fail(b"Delta bytes:\n"
 
164
                      b"       %r\n"
 
165
                      b"not in %r\n"
 
166
                      b"    or %r"
 
167
                      % (delta, delta1, delta2))
 
168
 
 
169
    def test_make_delta(self):
 
170
        delta = self.make_delta(_text1, _text2)
 
171
        self.assertDeltaIn(
 
172
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
173
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
174
            delta)
 
175
        delta = self.make_delta(_text2, _text1)
 
176
        self.assertDeltaIn(
 
177
            b'M\x90/\x1ebe matched\nagainst other text\n',
 
178
            b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
179
            delta)
 
180
        delta = self.make_delta(_text3, _text1)
 
181
        self.assertEqual(b'M\x90M', delta)
 
182
        delta = self.make_delta(_text3, _text2)
 
183
        self.assertDeltaIn(
 
184
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
185
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
186
            delta)
 
187
 
 
188
    def test_make_delta_with_large_copies(self):
 
189
        # We want to have a copy that is larger than 64kB, which forces us to
 
190
        # issue multiple copy instructions.
 
191
        big_text = _text3 * 1220
 
192
        delta = self.make_delta(big_text, big_text)
 
193
        self.assertDeltaIn(
 
194
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
195
            b'\x80'              # Copy 64kB, starting at byte 0
 
196
            b'\x84\x01'          # and another 64kB starting at 64kB
 
197
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
 
198
            None,   # Both implementations should be identical
 
199
            delta)
 
200
 
 
201
    def test_apply_delta_is_typesafe(self):
 
202
        self.apply_delta(_text1, b'M\x90M')
 
203
        self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
 
204
        self.assertRaises(TypeError, self.apply_delta,
 
205
                          _text1.decode('latin1'), b'M\x90M')
 
206
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
207
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
208
 
 
209
    def test_apply_delta(self):
 
210
        target = self.apply_delta(_text1,
 
211
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
 
212
        self.assertEqual(_text2, target)
 
213
        target = self.apply_delta(_text2,
 
214
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
 
215
        self.assertEqual(_text1, target)
 
216
 
 
217
    def test_apply_delta_to_source_is_safe(self):
 
218
        self.assertRaises(TypeError,
 
219
                          self.apply_delta_to_source, object(), 0, 1)
 
220
        self.assertRaises(TypeError,
 
221
                          self.apply_delta_to_source, u'unicode str', 0, 1)
 
222
        # end > length
 
223
        self.assertRaises(ValueError,
 
224
                          self.apply_delta_to_source, b'foo', 1, 4)
 
225
        # start > length
 
226
        self.assertRaises(ValueError,
 
227
                          self.apply_delta_to_source, b'foo', 5, 3)
 
228
        # start > end
 
229
        self.assertRaises(ValueError,
 
230
                          self.apply_delta_to_source, b'foo', 3, 2)
 
231
 
 
232
    def test_apply_delta_to_source(self):
 
233
        source_and_delta = (_text1
 
234
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
 
235
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
236
                                                            len(_text1), len(source_and_delta)))
 
237
 
 
238
 
 
239
class TestMakeAndApplyCompatible(tests.TestCase):
 
240
 
 
241
    scenarios = two_way_scenarios()
 
242
 
 
243
    make_delta = None  # Set by load_tests
 
244
    apply_delta = None  # Set by load_tests
 
245
 
 
246
    def assertMakeAndApply(self, source, target):
 
247
        """Assert that generating a delta and applying gives success."""
 
248
        delta = self.make_delta(source, target)
 
249
        bytes = self.apply_delta(source, delta)
 
250
        self.assertEqualDiff(target, bytes)
 
251
 
 
252
    def test_direct(self):
 
253
        self.assertMakeAndApply(_text1, _text2)
 
254
        self.assertMakeAndApply(_text2, _text1)
 
255
        self.assertMakeAndApply(_text1, _text3)
 
256
        self.assertMakeAndApply(_text3, _text1)
 
257
        self.assertMakeAndApply(_text2, _text3)
 
258
        self.assertMakeAndApply(_text3, _text2)
 
259
 
 
260
 
 
261
class TestDeltaIndex(tests.TestCase):
 
262
 
 
263
    def setUp(self):
 
264
        super(TestDeltaIndex, self).setUp()
 
265
        # This test isn't multiplied, because we only have DeltaIndex for the
 
266
        # compiled form
 
267
        # We call this here, because _test_needs_features happens after setUp
 
268
        self.requireFeature(compiled_groupcompress_feature)
 
269
        self._gc_module = compiled_groupcompress_feature.module
 
270
 
 
271
    def test_repr(self):
 
272
        di = self._gc_module.DeltaIndex(b'test text\n')
 
273
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
274
 
 
275
    def test_sizeof(self):
 
276
        di = self._gc_module.DeltaIndex()
 
277
        # Exact value will depend on platform but should include sources
 
278
        # source_info is a pointer and two longs so at least 12 bytes
 
279
        lower_bound = di._max_num_sources * 12
 
280
        self.assertGreater(sys.getsizeof(di), lower_bound)
 
281
 
 
282
    def test__dump_no_index(self):
 
283
        di = self._gc_module.DeltaIndex()
 
284
        self.assertEqual(None, di._dump_index())
 
285
 
 
286
    def test__dump_index_simple(self):
 
287
        di = self._gc_module.DeltaIndex()
 
288
        di.add_source(_text1, 0)
 
289
        self.assertFalse(di._has_index())
 
290
        self.assertEqual(None, di._dump_index())
 
291
        _ = di.make_delta(_text1)
 
292
        self.assertTrue(di._has_index())
 
293
        hash_list, entry_list = di._dump_index()
 
294
        self.assertEqual(16, len(hash_list))
 
295
        self.assertEqual(68, len(entry_list))
 
296
        just_entries = [(idx, text_offset, hash_val)
 
297
                        for idx, (text_offset, hash_val)
 
298
                        in enumerate(entry_list)
 
299
                        if text_offset != 0 or hash_val != 0]
 
300
        rabin_hash = self._gc_module._rabin_hash
 
301
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
302
                          (25, 48, rabin_hash(_text1[33:49])),
 
303
                          (34, 32, rabin_hash(_text1[17:33])),
 
304
                          (47, 64, rabin_hash(_text1[49:65])),
 
305
                          ], just_entries)
 
306
        # This ensures that the hash map points to the location we expect it to
 
307
        for entry_idx, text_offset, hash_val in just_entries:
 
308
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
309
 
 
310
    def test__dump_index_two_sources(self):
 
311
        di = self._gc_module.DeltaIndex()
 
312
        di.add_source(_text1, 0)
 
313
        di.add_source(_text2, 2)
 
314
        start2 = len(_text1) + 2
 
315
        self.assertTrue(di._has_index())
 
316
        hash_list, entry_list = di._dump_index()
 
317
        self.assertEqual(16, len(hash_list))
 
318
        self.assertEqual(68, len(entry_list))
 
319
        just_entries = [(idx, text_offset, hash_val)
 
320
                        for idx, (text_offset, hash_val)
 
321
                        in enumerate(entry_list)
 
322
                        if text_offset != 0 or hash_val != 0]
 
323
        rabin_hash = self._gc_module._rabin_hash
 
324
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
325
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
 
326
                          (25, 48, rabin_hash(_text1[33:49])),
 
327
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
 
328
                          (34, 32, rabin_hash(_text1[17:33])),
 
329
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
 
330
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
 
331
                          (47, 64, rabin_hash(_text1[49:65])),
 
332
                          ], just_entries)
 
333
        # Each entry should be in the appropriate hash bucket.
 
334
        for entry_idx, text_offset, hash_val in just_entries:
 
335
            hash_idx = hash_val & 0xf
 
336
            self.assertTrue(
 
337
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
 
338
 
 
339
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
340
        di = self._gc_module.DeltaIndex()
 
341
        self.assertFalse(di._has_index())
 
342
        di.add_source(_text1, 0)
 
343
        self.assertFalse(di._has_index())
 
344
        # However, asking to make a delta will trigger the index to be
 
345
        # generated, and will generate a proper delta
 
346
        delta = di.make_delta(_text2)
 
347
        self.assertTrue(di._has_index())
 
348
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
349
 
 
350
    def test_add_source_max_bytes_to_index(self):
 
351
        di = self._gc_module.DeltaIndex()
 
352
        di._max_bytes_to_index = 3 * 16
 
353
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
 
354
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
 
355
        start2 = len(_text1) + 3
 
356
        hash_list, entry_list = di._dump_index()
 
357
        self.assertEqual(16, len(hash_list))
 
358
        self.assertEqual(67, len(entry_list))
 
359
        just_entries = sorted([(text_offset, hash_val)
 
360
                               for text_offset, hash_val in entry_list
 
361
                               if text_offset != 0 or hash_val != 0])
 
362
        rabin_hash = self._gc_module._rabin_hash
 
363
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
364
                          (50, rabin_hash(_text1[35:51])),
 
365
                          (75, rabin_hash(_text1[60:76])),
 
366
                          (start2 + 44, rabin_hash(_text3[29:45])),
 
367
                          (start2 + 88, rabin_hash(_text3[73:89])),
 
368
                          (start2 + 132, rabin_hash(_text3[117:133])),
 
369
                          ], just_entries)
 
370
 
 
371
    def test_second_add_source_triggers_make_index(self):
 
372
        di = self._gc_module.DeltaIndex()
 
373
        self.assertFalse(di._has_index())
 
374
        di.add_source(_text1, 0)
 
375
        self.assertFalse(di._has_index())
 
376
        di.add_source(_text2, 0)
 
377
        self.assertTrue(di._has_index())
 
378
 
 
379
    def test_make_delta(self):
 
380
        di = self._gc_module.DeltaIndex(_text1)
 
381
        delta = di.make_delta(_text2)
 
382
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
383
 
 
384
    def test_delta_against_multiple_sources(self):
 
385
        di = self._gc_module.DeltaIndex()
 
386
        di.add_source(_first_text, 0)
 
387
        self.assertEqual(len(_first_text), di._source_offset)
 
388
        di.add_source(_second_text, 0)
 
389
        self.assertEqual(len(_first_text) + len(_second_text),
 
390
                         di._source_offset)
 
391
        delta = di.make_delta(_third_text)
 
392
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
393
        self.assertEqualDiff(_third_text, result)
 
394
        self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
 
395
                         b'\x91v6\x03and\x91d"\x91:\n', delta)
 
396
 
 
397
    def test_delta_with_offsets(self):
 
398
        di = self._gc_module.DeltaIndex()
 
399
        di.add_source(_first_text, 5)
 
400
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
401
        di.add_source(_second_text, 10)
 
402
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
403
                         di._source_offset)
 
404
        delta = di.make_delta(_third_text)
 
405
        self.assertIsNot(None, delta)
 
406
        result = self._gc_module.apply_delta(
 
407
            b'12345' + _first_text + b'1234567890' + _second_text, delta)
 
408
        self.assertIsNot(None, result)
 
409
        self.assertEqualDiff(_third_text, result)
 
410
        self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
 
411
                         b'\x91\x856\x03and\x91s"\x91?\n', delta)
 
412
 
 
413
    def test_delta_with_delta_bytes(self):
 
414
        di = self._gc_module.DeltaIndex()
 
415
        source = _first_text
 
416
        di.add_source(_first_text, 0)
 
417
        self.assertEqual(len(_first_text), di._source_offset)
 
418
        delta = di.make_delta(_second_text)
 
419
        self.assertEqual(b'h\tsome more\x91\x019'
 
420
                         b'&previous text\nand has some extra text\n', delta)
 
421
        di.add_delta_source(delta, 0)
 
422
        source += delta
 
423
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
424
        second_delta = di.make_delta(_third_text)
 
425
        result = self._gc_module.apply_delta(source, second_delta)
 
426
        self.assertEqualDiff(_third_text, result)
 
427
        # We should be able to match against the
 
428
        # 'previous text\nand has some...'  that was part of the delta bytes
 
429
        # Note that we don't match the 'common with the', because it isn't long
 
430
        # enough to match in the original text, and those bytes are not present
 
431
        # in the delta for the second text.
 
432
        self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
 
433
                         b'\x91S&\x03and\x91\x18,', second_delta)
 
434
        # Add this delta, and create a new delta for the same text. We should
 
435
        # find the remaining text, and only insert the short 'and' text.
 
436
        di.add_delta_source(second_delta, 0)
 
437
        source += second_delta
 
438
        third_delta = di.make_delta(_third_text)
 
439
        result = self._gc_module.apply_delta(source, third_delta)
 
440
        self.assertEqualDiff(_third_text, result)
 
441
        self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
 
442
                         b'\x91S&\x03and\x91\x18,', third_delta)
 
443
        # Now create a delta, which we know won't be able to be 'fit' into the
 
444
        # existing index
 
445
        fourth_delta = di.make_delta(_fourth_text)
 
446
        self.assertEqual(_fourth_text,
 
447
                         self._gc_module.apply_delta(source, fourth_delta))
 
448
        self.assertEqual(b'\x80\x01'
 
449
                         b'\x7f123456789012345\nsame rabin hash\n'
 
450
                         b'123456789012345\nsame rabin hash\n'
 
451
                         b'123456789012345\nsame rabin hash\n'
 
452
                         b'123456789012345\nsame rabin hash'
 
453
                         b'\x01\n', fourth_delta)
 
454
        di.add_delta_source(fourth_delta, 0)
 
455
        source += fourth_delta
 
456
        # With the next delta, everything should be found
 
457
        fifth_delta = di.make_delta(_fourth_text)
 
458
        self.assertEqual(_fourth_text,
 
459
                         self._gc_module.apply_delta(source, fifth_delta))
 
460
        self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
461
 
 
462
 
 
463
class TestCopyInstruction(tests.TestCase):
 
464
 
 
465
    def assertEncode(self, expected, offset, length):
 
466
        data = _groupcompress_py.encode_copy_instruction(offset, length)
 
467
        self.assertEqual(expected, data)
 
468
 
 
469
    def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
 
470
        cmd = data[pos]
 
471
        pos += 1
 
472
        out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
 
473
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
474
 
 
475
    def test_encode_no_length(self):
 
476
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
477
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
 
478
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
479
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
 
480
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
 
481
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
 
482
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
 
483
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
 
484
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
 
485
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
 
486
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
 
487
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
 
488
 
 
489
    def test_encode_no_offset(self):
 
490
        self.assertEncode(b'\x90\x01', 0, 1)
 
491
        self.assertEncode(b'\x90\x0a', 0, 10)
 
492
        self.assertEncode(b'\x90\xff', 0, 255)
 
493
        self.assertEncode(b'\xA0\x01', 0, 256)
 
494
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
495
        self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
 
496
        # Special case, if copy == 64KiB, then we store exactly 0
 
497
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
498
        # about that, as we would never actually copy 0 bytes
 
499
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
500
 
 
501
    def test_encode(self):
 
502
        self.assertEncode(b'\x91\x01\x01', 1, 1)
 
503
        self.assertEncode(b'\x91\x09\x0a', 9, 10)
 
504
        self.assertEncode(b'\x91\xfe\xff', 254, 255)
 
505
        self.assertEncode(b'\xA2\x02\x01', 512, 256)
 
506
        self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
 
507
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
508
        # Special case, if copy == 64KiB, then we store exactly 0
 
509
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
510
        # about that, as we would never actually copy 0 bytes
 
511
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
512
 
 
513
    def test_decode_no_length(self):
 
514
        # If length is 0, it is interpreted as 64KiB
 
515
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
516
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
517
        self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
 
518
        self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
 
519
        self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
 
520
        self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
 
521
        self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
 
522
        self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
 
523
        self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
 
524
        self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
 
525
        self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
 
526
        self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
 
527
        self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
 
528
 
 
529
    def test_decode_no_offset(self):
 
530
        self.assertDecode(0, 1, 2, b'\x90\x01', 0)
 
531
        self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
 
532
        self.assertDecode(0, 255, 2, b'\x90\xff', 0)
 
533
        self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
 
534
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
535
        self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
 
536
        # Special case, if copy == 64KiB, then we store exactly 0
 
537
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
538
        # about that, as we would never actually copy 0 bytes
 
539
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
540
 
 
541
    def test_decode(self):
 
542
        self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
 
543
        self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
 
544
        self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
 
545
        self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
 
546
        self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
 
547
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
548
 
 
549
    def test_decode_not_start(self):
 
550
        self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
 
551
        self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
 
552
        self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
 
553
 
 
554
 
 
555
class TestBase128Int(tests.TestCase):
 
556
 
 
557
    scenarios = module_scenarios()
 
558
 
 
559
    _gc_module = None  # Set by load_tests
 
560
 
 
561
    def assertEqualEncode(self, bytes, val):
 
562
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
563
 
 
564
    def assertEqualDecode(self, val, num_decode, bytes):
 
565
        self.assertEqual((val, num_decode),
 
566
                         self._gc_module.decode_base128_int(bytes))
 
567
 
 
568
    def test_encode(self):
 
569
        self.assertEqualEncode(b'\x01', 1)
 
570
        self.assertEqualEncode(b'\x02', 2)
 
571
        self.assertEqualEncode(b'\x7f', 127)
 
572
        self.assertEqualEncode(b'\x80\x01', 128)
 
573
        self.assertEqualEncode(b'\xff\x01', 255)
 
574
        self.assertEqualEncode(b'\x80\x02', 256)
 
575
        self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
576
 
 
577
    def test_decode(self):
 
578
        self.assertEqualDecode(1, 1, b'\x01')
 
579
        self.assertEqualDecode(2, 1, b'\x02')
 
580
        self.assertEqualDecode(127, 1, b'\x7f')
 
581
        self.assertEqualDecode(128, 2, b'\x80\x01')
 
582
        self.assertEqualDecode(255, 2, b'\xff\x01')
 
583
        self.assertEqualDecode(256, 2, b'\x80\x02')
 
584
        self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
 
585
 
 
586
    def test_decode_with_trailing_bytes(self):
 
587
        self.assertEqualDecode(1, 1, b'\x01abcdef')
 
588
        self.assertEqualDecode(127, 1, b'\x7f\x01')
 
589
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
 
590
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')