/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2019-02-14 03:30:18 UTC
  • mfrom: (6745.1.3 test-file-ids)
  • Revision ID: breezy.the.bot@gmail.com-20190214033018-4mhv416kiuozgned
Fix a commonly typoed word: compatibility.

Merged from https://code.launchpad.net/~jelmer/brz/compatibility-typos/+merge/363008

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
import sys
 
20
 
 
21
from .. import (
 
22
    tests,
 
23
    )
 
24
from ..bzr import (
 
25
    _groupcompress_py,
 
26
    )
 
27
from .scenarios import (
 
28
    load_tests_apply_scenarios,
 
29
    )
 
30
from ..sixish import (
 
31
    indexbytes,
 
32
    )
 
33
from . import (
 
34
    features,
 
35
    )
 
36
 
 
37
 
 
38
def module_scenarios():
 
39
    scenarios = [
 
40
        ('python', {'_gc_module': _groupcompress_py}),
 
41
        ]
 
42
    if compiled_groupcompress_feature.available():
 
43
        gc_module = compiled_groupcompress_feature.module
 
44
        scenarios.append(('C',
 
45
                          {'_gc_module': gc_module}))
 
46
    return scenarios
 
47
 
 
48
 
 
49
def two_way_scenarios():
 
50
    scenarios = [
 
51
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
52
                'apply_delta': _groupcompress_py.apply_delta})
 
53
        ]
 
54
    if compiled_groupcompress_feature.available():
 
55
        gc_module = compiled_groupcompress_feature.module
 
56
        scenarios.extend([
 
57
            ('CC', {'make_delta': gc_module.make_delta,
 
58
                    'apply_delta': gc_module.apply_delta}),
 
59
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
60
                    'apply_delta': gc_module.apply_delta}),
 
61
            ('CP', {'make_delta': gc_module.make_delta,
 
62
                    'apply_delta': _groupcompress_py.apply_delta}),
 
63
            ])
 
64
    return scenarios
 
65
 
 
66
 
 
67
load_tests = load_tests_apply_scenarios
 
68
 
 
69
 
 
70
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
71
    'breezy.bzr._groupcompress_pyx')
 
72
 
 
73
_text1 = b"""\
 
74
This is a bit
 
75
of source text
 
76
which is meant to be matched
 
77
against other text
 
78
"""
 
79
 
 
80
_text2 = b"""\
 
81
This is a bit
 
82
of source text
 
83
which is meant to differ from
 
84
against other text
 
85
"""
 
86
 
 
87
_text3 = b"""\
 
88
This is a bit
 
89
of source text
 
90
which is meant to be matched
 
91
against other text
 
92
except it also
 
93
has a lot more data
 
94
at the end of the file
 
95
"""
 
96
 
 
97
_first_text = b"""\
 
98
a bit of text, that
 
99
does not have much in
 
100
common with the next text
 
101
"""
 
102
 
 
103
_second_text = b"""\
 
104
some more bit of text, that
 
105
does not have much in
 
106
common with the previous text
 
107
and has some extra text
 
108
"""
 
109
 
 
110
 
 
111
_third_text = b"""\
 
112
a bit of text, that
 
113
has some in common with the previous text
 
114
and has some extra text
 
115
and not have much in
 
116
common with the next text
 
117
"""
 
118
 
 
119
_fourth_text = b"""\
 
120
123456789012345
 
121
same rabin hash
 
122
123456789012345
 
123
same rabin hash
 
124
123456789012345
 
125
same rabin hash
 
126
123456789012345
 
127
same rabin hash
 
128
"""
 
129
 
 
130
 
 
131
class TestMakeAndApplyDelta(tests.TestCase):
 
132
 
 
133
    scenarios = module_scenarios()
 
134
    _gc_module = None  # Set by load_tests
 
135
 
 
136
    def setUp(self):
 
137
        super(TestMakeAndApplyDelta, self).setUp()
 
138
        self.make_delta = self._gc_module.make_delta
 
139
        self.apply_delta = self._gc_module.apply_delta
 
140
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
141
 
 
142
    def test_make_delta_is_typesafe(self):
 
143
        self.make_delta(b'a string', b'another string')
 
144
 
 
145
        def _check_make_delta(string1, string2):
 
146
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
147
 
 
148
        _check_make_delta(b'a string', object())
 
149
        _check_make_delta(b'a string', u'not a string')
 
150
        _check_make_delta(object(), b'a string')
 
151
        _check_make_delta(u'not a string', b'a string')
 
152
 
 
153
    def test_make_noop_delta(self):
 
154
        ident_delta = self.make_delta(_text1, _text1)
 
155
        self.assertEqual(b'M\x90M', ident_delta)
 
156
        ident_delta = self.make_delta(_text2, _text2)
 
157
        self.assertEqual(b'N\x90N', ident_delta)
 
158
        ident_delta = self.make_delta(_text3, _text3)
 
159
        self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
 
160
 
 
161
    def assertDeltaIn(self, delta1, delta2, delta):
 
162
        """Make sure that the delta bytes match one of the expectations."""
 
163
        # In general, the python delta matcher gives different results than the
 
164
        # pyrex delta matcher. Both should be valid deltas, though.
 
165
        if delta not in (delta1, delta2):
 
166
            self.fail(b"Delta bytes:\n"
 
167
                      b"       %r\n"
 
168
                      b"not in %r\n"
 
169
                      b"    or %r"
 
170
                      % (delta, delta1, delta2))
 
171
 
 
172
    def test_make_delta(self):
 
173
        delta = self.make_delta(_text1, _text2)
 
174
        self.assertDeltaIn(
 
175
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
176
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
177
            delta)
 
178
        delta = self.make_delta(_text2, _text1)
 
179
        self.assertDeltaIn(
 
180
            b'M\x90/\x1ebe matched\nagainst other text\n',
 
181
            b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
182
            delta)
 
183
        delta = self.make_delta(_text3, _text1)
 
184
        self.assertEqual(b'M\x90M', delta)
 
185
        delta = self.make_delta(_text3, _text2)
 
186
        self.assertDeltaIn(
 
187
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
188
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
189
            delta)
 
190
 
 
191
    def test_make_delta_with_large_copies(self):
 
192
        # We want to have a copy that is larger than 64kB, which forces us to
 
193
        # issue multiple copy instructions.
 
194
        big_text = _text3 * 1220
 
195
        delta = self.make_delta(big_text, big_text)
 
196
        self.assertDeltaIn(
 
197
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
198
            b'\x80'              # Copy 64kB, starting at byte 0
 
199
            b'\x84\x01'          # and another 64kB starting at 64kB
 
200
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
 
201
            None,   # Both implementations should be identical
 
202
            delta)
 
203
 
 
204
    def test_apply_delta_is_typesafe(self):
 
205
        self.apply_delta(_text1, b'M\x90M')
 
206
        self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
 
207
        self.assertRaises(TypeError, self.apply_delta,
 
208
                          _text1.decode('latin1'), b'M\x90M')
 
209
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
210
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
211
 
 
212
    def test_apply_delta(self):
 
213
        target = self.apply_delta(_text1,
 
214
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
 
215
        self.assertEqual(_text2, target)
 
216
        target = self.apply_delta(_text2,
 
217
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
 
218
        self.assertEqual(_text1, target)
 
219
 
 
220
    def test_apply_delta_to_source_is_safe(self):
 
221
        self.assertRaises(TypeError,
 
222
                          self.apply_delta_to_source, object(), 0, 1)
 
223
        self.assertRaises(TypeError,
 
224
                          self.apply_delta_to_source, u'unicode str', 0, 1)
 
225
        # end > length
 
226
        self.assertRaises(ValueError,
 
227
                          self.apply_delta_to_source, b'foo', 1, 4)
 
228
        # start > length
 
229
        self.assertRaises(ValueError,
 
230
                          self.apply_delta_to_source, b'foo', 5, 3)
 
231
        # start > end
 
232
        self.assertRaises(ValueError,
 
233
                          self.apply_delta_to_source, b'foo', 3, 2)
 
234
 
 
235
    def test_apply_delta_to_source(self):
 
236
        source_and_delta = (_text1
 
237
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
 
238
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
239
                                                            len(_text1), len(source_and_delta)))
 
240
 
 
241
 
 
242
class TestMakeAndApplyCompatible(tests.TestCase):
 
243
 
 
244
    scenarios = two_way_scenarios()
 
245
 
 
246
    make_delta = None  # Set by load_tests
 
247
    apply_delta = None  # Set by load_tests
 
248
 
 
249
    def assertMakeAndApply(self, source, target):
 
250
        """Assert that generating a delta and applying gives success."""
 
251
        delta = self.make_delta(source, target)
 
252
        bytes = self.apply_delta(source, delta)
 
253
        self.assertEqualDiff(target, bytes)
 
254
 
 
255
    def test_direct(self):
 
256
        self.assertMakeAndApply(_text1, _text2)
 
257
        self.assertMakeAndApply(_text2, _text1)
 
258
        self.assertMakeAndApply(_text1, _text3)
 
259
        self.assertMakeAndApply(_text3, _text1)
 
260
        self.assertMakeAndApply(_text2, _text3)
 
261
        self.assertMakeAndApply(_text3, _text2)
 
262
 
 
263
 
 
264
class TestDeltaIndex(tests.TestCase):
 
265
 
 
266
    def setUp(self):
 
267
        super(TestDeltaIndex, self).setUp()
 
268
        # This test isn't multiplied, because we only have DeltaIndex for the
 
269
        # compiled form
 
270
        # We call this here, because _test_needs_features happens after setUp
 
271
        self.requireFeature(compiled_groupcompress_feature)
 
272
        self._gc_module = compiled_groupcompress_feature.module
 
273
 
 
274
    def test_repr(self):
 
275
        di = self._gc_module.DeltaIndex(b'test text\n')
 
276
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
277
 
 
278
    def test_sizeof(self):
 
279
        di = self._gc_module.DeltaIndex()
 
280
        # Exact value will depend on platform but should include sources
 
281
        # source_info is a pointer and two longs so at least 12 bytes
 
282
        lower_bound = di._max_num_sources * 12
 
283
        self.assertGreater(sys.getsizeof(di), lower_bound)
 
284
 
 
285
    def test__dump_no_index(self):
 
286
        di = self._gc_module.DeltaIndex()
 
287
        self.assertEqual(None, di._dump_index())
 
288
 
 
289
    def test__dump_index_simple(self):
 
290
        di = self._gc_module.DeltaIndex()
 
291
        di.add_source(_text1, 0)
 
292
        self.assertFalse(di._has_index())
 
293
        self.assertEqual(None, di._dump_index())
 
294
        _ = di.make_delta(_text1)
 
295
        self.assertTrue(di._has_index())
 
296
        hash_list, entry_list = di._dump_index()
 
297
        self.assertEqual(16, len(hash_list))
 
298
        self.assertEqual(68, len(entry_list))
 
299
        just_entries = [(idx, text_offset, hash_val)
 
300
                        for idx, (text_offset, hash_val)
 
301
                        in enumerate(entry_list)
 
302
                        if text_offset != 0 or hash_val != 0]
 
303
        rabin_hash = self._gc_module._rabin_hash
 
304
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
305
                          (25, 48, rabin_hash(_text1[33:49])),
 
306
                          (34, 32, rabin_hash(_text1[17:33])),
 
307
                          (47, 64, rabin_hash(_text1[49:65])),
 
308
                          ], just_entries)
 
309
        # This ensures that the hash map points to the location we expect it to
 
310
        for entry_idx, text_offset, hash_val in just_entries:
 
311
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
312
 
 
313
    def test__dump_index_two_sources(self):
 
314
        di = self._gc_module.DeltaIndex()
 
315
        di.add_source(_text1, 0)
 
316
        di.add_source(_text2, 2)
 
317
        start2 = len(_text1) + 2
 
318
        self.assertTrue(di._has_index())
 
319
        hash_list, entry_list = di._dump_index()
 
320
        self.assertEqual(16, len(hash_list))
 
321
        self.assertEqual(68, len(entry_list))
 
322
        just_entries = [(idx, text_offset, hash_val)
 
323
                        for idx, (text_offset, hash_val)
 
324
                        in enumerate(entry_list)
 
325
                        if text_offset != 0 or hash_val != 0]
 
326
        rabin_hash = self._gc_module._rabin_hash
 
327
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
328
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
 
329
                          (25, 48, rabin_hash(_text1[33:49])),
 
330
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
 
331
                          (34, 32, rabin_hash(_text1[17:33])),
 
332
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
 
333
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
 
334
                          (47, 64, rabin_hash(_text1[49:65])),
 
335
                          ], just_entries)
 
336
        # Each entry should be in the appropriate hash bucket.
 
337
        for entry_idx, text_offset, hash_val in just_entries:
 
338
            hash_idx = hash_val & 0xf
 
339
            self.assertTrue(
 
340
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
 
341
 
 
342
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
343
        di = self._gc_module.DeltaIndex()
 
344
        self.assertFalse(di._has_index())
 
345
        di.add_source(_text1, 0)
 
346
        self.assertFalse(di._has_index())
 
347
        # However, asking to make a delta will trigger the index to be
 
348
        # generated, and will generate a proper delta
 
349
        delta = di.make_delta(_text2)
 
350
        self.assertTrue(di._has_index())
 
351
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
352
 
 
353
    def test_add_source_max_bytes_to_index(self):
 
354
        di = self._gc_module.DeltaIndex()
 
355
        di._max_bytes_to_index = 3 * 16
 
356
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
 
357
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
 
358
        start2 = len(_text1) + 3
 
359
        hash_list, entry_list = di._dump_index()
 
360
        self.assertEqual(16, len(hash_list))
 
361
        self.assertEqual(67, len(entry_list))
 
362
        just_entries = sorted([(text_offset, hash_val)
 
363
                               for text_offset, hash_val in entry_list
 
364
                               if text_offset != 0 or hash_val != 0])
 
365
        rabin_hash = self._gc_module._rabin_hash
 
366
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
367
                          (50, rabin_hash(_text1[35:51])),
 
368
                          (75, rabin_hash(_text1[60:76])),
 
369
                          (start2 + 44, rabin_hash(_text3[29:45])),
 
370
                          (start2 + 88, rabin_hash(_text3[73:89])),
 
371
                          (start2 + 132, rabin_hash(_text3[117:133])),
 
372
                          ], just_entries)
 
373
 
 
374
    def test_second_add_source_triggers_make_index(self):
 
375
        di = self._gc_module.DeltaIndex()
 
376
        self.assertFalse(di._has_index())
 
377
        di.add_source(_text1, 0)
 
378
        self.assertFalse(di._has_index())
 
379
        di.add_source(_text2, 0)
 
380
        self.assertTrue(di._has_index())
 
381
 
 
382
    def test_make_delta(self):
 
383
        di = self._gc_module.DeltaIndex(_text1)
 
384
        delta = di.make_delta(_text2)
 
385
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
386
 
 
387
    def test_delta_against_multiple_sources(self):
 
388
        di = self._gc_module.DeltaIndex()
 
389
        di.add_source(_first_text, 0)
 
390
        self.assertEqual(len(_first_text), di._source_offset)
 
391
        di.add_source(_second_text, 0)
 
392
        self.assertEqual(len(_first_text) + len(_second_text),
 
393
                         di._source_offset)
 
394
        delta = di.make_delta(_third_text)
 
395
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
396
        self.assertEqualDiff(_third_text, result)
 
397
        self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
 
398
                         b'\x91v6\x03and\x91d"\x91:\n', delta)
 
399
 
 
400
    def test_delta_with_offsets(self):
 
401
        di = self._gc_module.DeltaIndex()
 
402
        di.add_source(_first_text, 5)
 
403
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
404
        di.add_source(_second_text, 10)
 
405
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
406
                         di._source_offset)
 
407
        delta = di.make_delta(_third_text)
 
408
        self.assertIsNot(None, delta)
 
409
        result = self._gc_module.apply_delta(
 
410
            b'12345' + _first_text + b'1234567890' + _second_text, delta)
 
411
        self.assertIsNot(None, result)
 
412
        self.assertEqualDiff(_third_text, result)
 
413
        self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
 
414
                         b'\x91\x856\x03and\x91s"\x91?\n', delta)
 
415
 
 
416
    def test_delta_with_delta_bytes(self):
 
417
        di = self._gc_module.DeltaIndex()
 
418
        source = _first_text
 
419
        di.add_source(_first_text, 0)
 
420
        self.assertEqual(len(_first_text), di._source_offset)
 
421
        delta = di.make_delta(_second_text)
 
422
        self.assertEqual(b'h\tsome more\x91\x019'
 
423
                         b'&previous text\nand has some extra text\n', delta)
 
424
        di.add_delta_source(delta, 0)
 
425
        source += delta
 
426
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
427
        second_delta = di.make_delta(_third_text)
 
428
        result = self._gc_module.apply_delta(source, second_delta)
 
429
        self.assertEqualDiff(_third_text, result)
 
430
        # We should be able to match against the
 
431
        # 'previous text\nand has some...'  that was part of the delta bytes
 
432
        # Note that we don't match the 'common with the', because it isn't long
 
433
        # enough to match in the original text, and those bytes are not present
 
434
        # in the delta for the second text.
 
435
        self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
 
436
                         b'\x91S&\x03and\x91\x18,', second_delta)
 
437
        # Add this delta, and create a new delta for the same text. We should
 
438
        # find the remaining text, and only insert the short 'and' text.
 
439
        di.add_delta_source(second_delta, 0)
 
440
        source += second_delta
 
441
        third_delta = di.make_delta(_third_text)
 
442
        result = self._gc_module.apply_delta(source, third_delta)
 
443
        self.assertEqualDiff(_third_text, result)
 
444
        self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
 
445
                         b'\x91S&\x03and\x91\x18,', third_delta)
 
446
        # Now create a delta, which we know won't be able to be 'fit' into the
 
447
        # existing index
 
448
        fourth_delta = di.make_delta(_fourth_text)
 
449
        self.assertEqual(_fourth_text,
 
450
                         self._gc_module.apply_delta(source, fourth_delta))
 
451
        self.assertEqual(b'\x80\x01'
 
452
                         b'\x7f123456789012345\nsame rabin hash\n'
 
453
                         b'123456789012345\nsame rabin hash\n'
 
454
                         b'123456789012345\nsame rabin hash\n'
 
455
                         b'123456789012345\nsame rabin hash'
 
456
                         b'\x01\n', fourth_delta)
 
457
        di.add_delta_source(fourth_delta, 0)
 
458
        source += fourth_delta
 
459
        # With the next delta, everything should be found
 
460
        fifth_delta = di.make_delta(_fourth_text)
 
461
        self.assertEqual(_fourth_text,
 
462
                         self._gc_module.apply_delta(source, fifth_delta))
 
463
        self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
464
 
 
465
 
 
466
class TestCopyInstruction(tests.TestCase):
 
467
 
 
468
    def assertEncode(self, expected, offset, length):
 
469
        data = _groupcompress_py.encode_copy_instruction(offset, length)
 
470
        self.assertEqual(expected, data)
 
471
 
 
472
    def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
 
473
        cmd = indexbytes(data, pos)
 
474
        pos += 1
 
475
        out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
 
476
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
477
 
 
478
    def test_encode_no_length(self):
 
479
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
480
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
 
481
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
482
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
 
483
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
 
484
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
 
485
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
 
486
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
 
487
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
 
488
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
 
489
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
 
490
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
 
491
 
 
492
    def test_encode_no_offset(self):
 
493
        self.assertEncode(b'\x90\x01', 0, 1)
 
494
        self.assertEncode(b'\x90\x0a', 0, 10)
 
495
        self.assertEncode(b'\x90\xff', 0, 255)
 
496
        self.assertEncode(b'\xA0\x01', 0, 256)
 
497
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
498
        self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
 
499
        # Special case, if copy == 64KiB, then we store exactly 0
 
500
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
501
        # about that, as we would never actually copy 0 bytes
 
502
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
503
 
 
504
    def test_encode(self):
 
505
        self.assertEncode(b'\x91\x01\x01', 1, 1)
 
506
        self.assertEncode(b'\x91\x09\x0a', 9, 10)
 
507
        self.assertEncode(b'\x91\xfe\xff', 254, 255)
 
508
        self.assertEncode(b'\xA2\x02\x01', 512, 256)
 
509
        self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
 
510
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
511
        # Special case, if copy == 64KiB, then we store exactly 0
 
512
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
513
        # about that, as we would never actually copy 0 bytes
 
514
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
515
 
 
516
    def test_decode_no_length(self):
 
517
        # If length is 0, it is interpreted as 64KiB
 
518
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
519
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
520
        self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
 
521
        self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
 
522
        self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
 
523
        self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
 
524
        self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
 
525
        self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
 
526
        self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
 
527
        self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
 
528
        self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
 
529
        self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
 
530
        self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
 
531
 
 
532
    def test_decode_no_offset(self):
 
533
        self.assertDecode(0, 1, 2, b'\x90\x01', 0)
 
534
        self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
 
535
        self.assertDecode(0, 255, 2, b'\x90\xff', 0)
 
536
        self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
 
537
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
538
        self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
 
539
        # Special case, if copy == 64KiB, then we store exactly 0
 
540
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
541
        # about that, as we would never actually copy 0 bytes
 
542
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
543
 
 
544
    def test_decode(self):
 
545
        self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
 
546
        self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
 
547
        self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
 
548
        self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
 
549
        self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
 
550
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
551
 
 
552
    def test_decode_not_start(self):
 
553
        self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
 
554
        self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
 
555
        self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
 
556
 
 
557
 
 
558
class TestBase128Int(tests.TestCase):
 
559
 
 
560
    scenarios = module_scenarios()
 
561
 
 
562
    _gc_module = None  # Set by load_tests
 
563
 
 
564
    def assertEqualEncode(self, bytes, val):
 
565
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
566
 
 
567
    def assertEqualDecode(self, val, num_decode, bytes):
 
568
        self.assertEqual((val, num_decode),
 
569
                         self._gc_module.decode_base128_int(bytes))
 
570
 
 
571
    def test_encode(self):
 
572
        self.assertEqualEncode(b'\x01', 1)
 
573
        self.assertEqualEncode(b'\x02', 2)
 
574
        self.assertEqualEncode(b'\x7f', 127)
 
575
        self.assertEqualEncode(b'\x80\x01', 128)
 
576
        self.assertEqualEncode(b'\xff\x01', 255)
 
577
        self.assertEqualEncode(b'\x80\x02', 256)
 
578
        self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
579
 
 
580
    def test_decode(self):
 
581
        self.assertEqualDecode(1, 1, b'\x01')
 
582
        self.assertEqualDecode(2, 1, b'\x02')
 
583
        self.assertEqualDecode(127, 1, b'\x7f')
 
584
        self.assertEqualDecode(128, 2, b'\x80\x01')
 
585
        self.assertEqualDecode(255, 2, b'\xff\x01')
 
586
        self.assertEqualDecode(256, 2, b'\x80\x02')
 
587
        self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
 
588
 
 
589
    def test_decode_with_trailing_bytes(self):
 
590
        self.assertEqualDecode(1, 1, b'\x01abcdef')
 
591
        self.assertEqualDecode(127, 1, b'\x7f\x01')
 
592
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
 
593
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')