/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2018-05-19 13:16:11 UTC
  • mto: (6968.4.3 git-archive)
  • mto: This revision was merged to the branch mainline in revision 6972.
  • Revision ID: jelmer@jelmer.uk-20180519131611-l9h9ud41j7qg1m03
Move tar/zip to breezy.archive.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
from .. import (
 
20
    tests,
 
21
    )
 
22
from ..bzr import (
 
23
    _groupcompress_py,
 
24
    )
 
25
from .scenarios import (
 
26
    load_tests_apply_scenarios,
 
27
    )
 
28
from ..sixish import (
 
29
    indexbytes,
 
30
    )
 
31
from . import (
 
32
    features,
 
33
    )
 
34
 
 
35
 
 
36
def module_scenarios():
 
37
    scenarios = [
 
38
        ('python', {'_gc_module': _groupcompress_py}),
 
39
        ]
 
40
    if compiled_groupcompress_feature.available():
 
41
        gc_module = compiled_groupcompress_feature.module
 
42
        scenarios.append(('C',
 
43
            {'_gc_module': gc_module}))
 
44
    return scenarios
 
45
 
 
46
 
 
47
def two_way_scenarios():
 
48
    scenarios = [
 
49
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
50
                'apply_delta': _groupcompress_py.apply_delta})
 
51
        ]
 
52
    if compiled_groupcompress_feature.available():
 
53
        gc_module = compiled_groupcompress_feature.module
 
54
        scenarios.extend([
 
55
            ('CC', {'make_delta': gc_module.make_delta,
 
56
                    'apply_delta': gc_module.apply_delta}),
 
57
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
58
                    'apply_delta': gc_module.apply_delta}),
 
59
            ('CP', {'make_delta': gc_module.make_delta,
 
60
                    'apply_delta': _groupcompress_py.apply_delta}),
 
61
            ])
 
62
    return scenarios
 
63
 
 
64
 
 
65
load_tests = load_tests_apply_scenarios
 
66
 
 
67
 
 
68
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
69
    'breezy.bzr._groupcompress_pyx')
 
70
 
 
71
_text1 = b"""\
 
72
This is a bit
 
73
of source text
 
74
which is meant to be matched
 
75
against other text
 
76
"""
 
77
 
 
78
_text2 = b"""\
 
79
This is a bit
 
80
of source text
 
81
which is meant to differ from
 
82
against other text
 
83
"""
 
84
 
 
85
_text3 = b"""\
 
86
This is a bit
 
87
of source text
 
88
which is meant to be matched
 
89
against other text
 
90
except it also
 
91
has a lot more data
 
92
at the end of the file
 
93
"""
 
94
 
 
95
_first_text = b"""\
 
96
a bit of text, that
 
97
does not have much in
 
98
common with the next text
 
99
"""
 
100
 
 
101
_second_text = b"""\
 
102
some more bit of text, that
 
103
does not have much in
 
104
common with the previous text
 
105
and has some extra text
 
106
"""
 
107
 
 
108
 
 
109
_third_text = b"""\
 
110
a bit of text, that
 
111
has some in common with the previous text
 
112
and has some extra text
 
113
and not have much in
 
114
common with the next text
 
115
"""
 
116
 
 
117
_fourth_text = b"""\
 
118
123456789012345
 
119
same rabin hash
 
120
123456789012345
 
121
same rabin hash
 
122
123456789012345
 
123
same rabin hash
 
124
123456789012345
 
125
same rabin hash
 
126
"""
 
127
 
 
128
 
 
129
class TestMakeAndApplyDelta(tests.TestCase):
 
130
 
 
131
    scenarios = module_scenarios()
 
132
    _gc_module = None # Set by load_tests
 
133
 
 
134
    def setUp(self):
 
135
        super(TestMakeAndApplyDelta, self).setUp()
 
136
        self.make_delta = self._gc_module.make_delta
 
137
        self.apply_delta = self._gc_module.apply_delta
 
138
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
139
 
 
140
    def test_make_delta_is_typesafe(self):
 
141
        self.make_delta(b'a string', b'another string')
 
142
 
 
143
        def _check_make_delta(string1, string2):
 
144
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
145
 
 
146
        _check_make_delta(b'a string', object())
 
147
        _check_make_delta(b'a string', u'not a string')
 
148
        _check_make_delta(object(), b'a string')
 
149
        _check_make_delta(u'not a string', b'a string')
 
150
 
 
151
    def test_make_noop_delta(self):
 
152
        ident_delta = self.make_delta(_text1, _text1)
 
153
        self.assertEqual(b'M\x90M', ident_delta)
 
154
        ident_delta = self.make_delta(_text2, _text2)
 
155
        self.assertEqual(b'N\x90N', ident_delta)
 
156
        ident_delta = self.make_delta(_text3, _text3)
 
157
        self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
 
158
 
 
159
    def assertDeltaIn(self, delta1, delta2, delta):
 
160
        """Make sure that the delta bytes match one of the expectations."""
 
161
        # In general, the python delta matcher gives different results than the
 
162
        # pyrex delta matcher. Both should be valid deltas, though.
 
163
        if delta not in (delta1, delta2):
 
164
            self.fail(b"Delta bytes:\n"
 
165
                      b"       %r\n"
 
166
                      b"not in %r\n"
 
167
                      b"    or %r"
 
168
                      % (delta, delta1, delta2))
 
169
 
 
170
    def test_make_delta(self):
 
171
        delta = self.make_delta(_text1, _text2)
 
172
        self.assertDeltaIn(
 
173
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
174
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
175
            delta)
 
176
        delta = self.make_delta(_text2, _text1)
 
177
        self.assertDeltaIn(
 
178
            b'M\x90/\x1ebe matched\nagainst other text\n',
 
179
            b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
180
            delta)
 
181
        delta = self.make_delta(_text3, _text1)
 
182
        self.assertEqual(b'M\x90M', delta)
 
183
        delta = self.make_delta(_text3, _text2)
 
184
        self.assertDeltaIn(
 
185
            b'N\x90/\x1fdiffer from\nagainst other text\n',
 
186
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
187
            delta)
 
188
 
 
189
    def test_make_delta_with_large_copies(self):
 
190
        # We want to have a copy that is larger than 64kB, which forces us to
 
191
        # issue multiple copy instructions.
 
192
        big_text = _text3 * 1220
 
193
        delta = self.make_delta(big_text, big_text)
 
194
        self.assertDeltaIn(
 
195
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
196
            b'\x80'              # Copy 64kB, starting at byte 0
 
197
            b'\x84\x01'          # and another 64kB starting at 64kB
 
198
            b'\xb4\x02\x5c\x83', # And the bit of tail.
 
199
            None,   # Both implementations should be identical
 
200
            delta)
 
201
 
 
202
    def test_apply_delta_is_typesafe(self):
 
203
        self.apply_delta(_text1, b'M\x90M')
 
204
        self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
 
205
        self.assertRaises(TypeError, self.apply_delta,
 
206
                          _text1.decode('latin1'), b'M\x90M')
 
207
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
208
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
209
 
 
210
    def test_apply_delta(self):
 
211
        target = self.apply_delta(_text1,
 
212
                    b'N\x90/\x1fdiffer from\nagainst other text\n')
 
213
        self.assertEqual(_text2, target)
 
214
        target = self.apply_delta(_text2,
 
215
                    b'M\x90/\x1ebe matched\nagainst other text\n')
 
216
        self.assertEqual(_text1, target)
 
217
 
 
218
    def test_apply_delta_to_source_is_safe(self):
 
219
        self.assertRaises(TypeError,
 
220
            self.apply_delta_to_source, object(), 0, 1)
 
221
        self.assertRaises(TypeError,
 
222
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
223
        # end > length
 
224
        self.assertRaises(ValueError,
 
225
            self.apply_delta_to_source, b'foo', 1, 4)
 
226
        # start > length
 
227
        self.assertRaises(ValueError,
 
228
            self.apply_delta_to_source, b'foo', 5, 3)
 
229
        # start > end
 
230
        self.assertRaises(ValueError,
 
231
            self.apply_delta_to_source, b'foo', 3, 2)
 
232
 
 
233
    def test_apply_delta_to_source(self):
 
234
        source_and_delta = (_text1
 
235
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
 
236
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
237
                                    len(_text1), len(source_and_delta)))
 
238
 
 
239
 
 
240
class TestMakeAndApplyCompatible(tests.TestCase):
 
241
 
 
242
    scenarios = two_way_scenarios()
 
243
 
 
244
    make_delta = None # Set by load_tests
 
245
    apply_delta = None # Set by load_tests
 
246
 
 
247
    def assertMakeAndApply(self, source, target):
 
248
        """Assert that generating a delta and applying gives success."""
 
249
        delta = self.make_delta(source, target)
 
250
        bytes = self.apply_delta(source, delta)
 
251
        self.assertEqualDiff(target, bytes)
 
252
 
 
253
    def test_direct(self):
 
254
        self.assertMakeAndApply(_text1, _text2)
 
255
        self.assertMakeAndApply(_text2, _text1)
 
256
        self.assertMakeAndApply(_text1, _text3)
 
257
        self.assertMakeAndApply(_text3, _text1)
 
258
        self.assertMakeAndApply(_text2, _text3)
 
259
        self.assertMakeAndApply(_text3, _text2)
 
260
 
 
261
 
 
262
class TestDeltaIndex(tests.TestCase):
 
263
 
 
264
    def setUp(self):
 
265
        super(TestDeltaIndex, self).setUp()
 
266
        # This test isn't multiplied, because we only have DeltaIndex for the
 
267
        # compiled form
 
268
        # We call this here, because _test_needs_features happens after setUp
 
269
        self.requireFeature(compiled_groupcompress_feature)
 
270
        self._gc_module = compiled_groupcompress_feature.module
 
271
 
 
272
    def test_repr(self):
 
273
        di = self._gc_module.DeltaIndex('test text\n')
 
274
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
275
 
 
276
    def test__dump_no_index(self):
 
277
        di = self._gc_module.DeltaIndex()
 
278
        self.assertEqual(None, di._dump_index())
 
279
 
 
280
    def test__dump_index_simple(self):
 
281
        di = self._gc_module.DeltaIndex()
 
282
        di.add_source(_text1, 0)
 
283
        self.assertFalse(di._has_index())
 
284
        self.assertEqual(None, di._dump_index())
 
285
        _ = di.make_delta(_text1)
 
286
        self.assertTrue(di._has_index())
 
287
        hash_list, entry_list = di._dump_index()
 
288
        self.assertEqual(16, len(hash_list))
 
289
        self.assertEqual(68, len(entry_list))
 
290
        just_entries = [(idx, text_offset, hash_val)
 
291
                        for idx, (text_offset, hash_val)
 
292
                         in enumerate(entry_list)
 
293
                         if text_offset != 0 or hash_val != 0]
 
294
        rabin_hash = self._gc_module._rabin_hash
 
295
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
296
                          (25, 48, rabin_hash(_text1[33:49])),
 
297
                          (34, 32, rabin_hash(_text1[17:33])),
 
298
                          (47, 64, rabin_hash(_text1[49:65])),
 
299
                         ], just_entries)
 
300
        # This ensures that the hash map points to the location we expect it to
 
301
        for entry_idx, text_offset, hash_val in just_entries:
 
302
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
303
 
 
304
    def test__dump_index_two_sources(self):
 
305
        di = self._gc_module.DeltaIndex()
 
306
        di.add_source(_text1, 0)
 
307
        di.add_source(_text2, 2)
 
308
        start2 = len(_text1) + 2
 
309
        self.assertTrue(di._has_index())
 
310
        hash_list, entry_list = di._dump_index()
 
311
        self.assertEqual(16, len(hash_list))
 
312
        self.assertEqual(68, len(entry_list))
 
313
        just_entries = [(idx, text_offset, hash_val)
 
314
                        for idx, (text_offset, hash_val)
 
315
                         in enumerate(entry_list)
 
316
                         if text_offset != 0 or hash_val != 0]
 
317
        rabin_hash = self._gc_module._rabin_hash
 
318
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
319
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
320
                          (25, 48, rabin_hash(_text1[33:49])),
 
321
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
322
                          (34, 32, rabin_hash(_text1[17:33])),
 
323
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
324
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
325
                          (47, 64, rabin_hash(_text1[49:65])),
 
326
                         ], just_entries)
 
327
        # Each entry should be in the appropriate hash bucket.
 
328
        for entry_idx, text_offset, hash_val in just_entries:
 
329
            hash_idx = hash_val & 0xf
 
330
            self.assertTrue(
 
331
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
332
 
 
333
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
334
        di = self._gc_module.DeltaIndex()
 
335
        self.assertFalse(di._has_index())
 
336
        di.add_source(_text1, 0)
 
337
        self.assertFalse(di._has_index())
 
338
        # However, asking to make a delta will trigger the index to be
 
339
        # generated, and will generate a proper delta
 
340
        delta = di.make_delta(_text2)
 
341
        self.assertTrue(di._has_index())
 
342
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
343
 
 
344
    def test_add_source_max_bytes_to_index(self):
 
345
        di = self._gc_module.DeltaIndex()
 
346
        di._max_bytes_to_index = 3*16
 
347
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
348
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
349
        start2 = len(_text1) + 3
 
350
        hash_list, entry_list = di._dump_index()
 
351
        self.assertEqual(16, len(hash_list))
 
352
        self.assertEqual(67, len(entry_list))
 
353
        just_entries = sorted([(text_offset, hash_val)
 
354
                               for text_offset, hash_val in entry_list
 
355
                                if text_offset != 0 or hash_val != 0])
 
356
        rabin_hash = self._gc_module._rabin_hash
 
357
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
358
                          (50, rabin_hash(_text1[35:51])),
 
359
                          (75, rabin_hash(_text1[60:76])),
 
360
                          (start2+44, rabin_hash(_text3[29:45])),
 
361
                          (start2+88, rabin_hash(_text3[73:89])),
 
362
                          (start2+132, rabin_hash(_text3[117:133])),
 
363
                         ], just_entries)
 
364
 
 
365
    def test_second_add_source_triggers_make_index(self):
 
366
        di = self._gc_module.DeltaIndex()
 
367
        self.assertFalse(di._has_index())
 
368
        di.add_source(_text1, 0)
 
369
        self.assertFalse(di._has_index())
 
370
        di.add_source(_text2, 0)
 
371
        self.assertTrue(di._has_index())
 
372
 
 
373
    def test_make_delta(self):
 
374
        di = self._gc_module.DeltaIndex(_text1)
 
375
        delta = di.make_delta(_text2)
 
376
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
377
 
 
378
    def test_delta_against_multiple_sources(self):
 
379
        di = self._gc_module.DeltaIndex()
 
380
        di.add_source(_first_text, 0)
 
381
        self.assertEqual(len(_first_text), di._source_offset)
 
382
        di.add_source(_second_text, 0)
 
383
        self.assertEqual(len(_first_text) + len(_second_text),
 
384
                         di._source_offset)
 
385
        delta = di.make_delta(_third_text)
 
386
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
387
        self.assertEqualDiff(_third_text, result)
 
388
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
389
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
390
 
 
391
    def test_delta_with_offsets(self):
 
392
        di = self._gc_module.DeltaIndex()
 
393
        di.add_source(_first_text, 5)
 
394
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
395
        di.add_source(_second_text, 10)
 
396
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
397
                         di._source_offset)
 
398
        delta = di.make_delta(_third_text)
 
399
        self.assertIsNot(None, delta)
 
400
        result = self._gc_module.apply_delta(
 
401
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
402
        self.assertIsNot(None, result)
 
403
        self.assertEqualDiff(_third_text, result)
 
404
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
405
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
406
 
 
407
    def test_delta_with_delta_bytes(self):
 
408
        di = self._gc_module.DeltaIndex()
 
409
        source = _first_text
 
410
        di.add_source(_first_text, 0)
 
411
        self.assertEqual(len(_first_text), di._source_offset)
 
412
        delta = di.make_delta(_second_text)
 
413
        self.assertEqual('h\tsome more\x91\x019'
 
414
                         '&previous text\nand has some extra text\n', delta)
 
415
        di.add_delta_source(delta, 0)
 
416
        source += delta
 
417
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
418
        second_delta = di.make_delta(_third_text)
 
419
        result = self._gc_module.apply_delta(source, second_delta)
 
420
        self.assertEqualDiff(_third_text, result)
 
421
        # We should be able to match against the
 
422
        # 'previous text\nand has some...'  that was part of the delta bytes
 
423
        # Note that we don't match the 'common with the', because it isn't long
 
424
        # enough to match in the original text, and those bytes are not present
 
425
        # in the delta for the second text.
 
426
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
427
                         '\x91S&\x03and\x91\x18,', second_delta)
 
428
        # Add this delta, and create a new delta for the same text. We should
 
429
        # find the remaining text, and only insert the short 'and' text.
 
430
        di.add_delta_source(second_delta, 0)
 
431
        source += second_delta
 
432
        third_delta = di.make_delta(_third_text)
 
433
        result = self._gc_module.apply_delta(source, third_delta)
 
434
        self.assertEqualDiff(_third_text, result)
 
435
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
436
                         '\x91S&\x03and\x91\x18,', third_delta)
 
437
        # Now create a delta, which we know won't be able to be 'fit' into the
 
438
        # existing index
 
439
        fourth_delta = di.make_delta(_fourth_text)
 
440
        self.assertEqual(_fourth_text,
 
441
                         self._gc_module.apply_delta(source, fourth_delta))
 
442
        self.assertEqual('\x80\x01'
 
443
                         '\x7f123456789012345\nsame rabin hash\n'
 
444
                         '123456789012345\nsame rabin hash\n'
 
445
                         '123456789012345\nsame rabin hash\n'
 
446
                         '123456789012345\nsame rabin hash'
 
447
                         '\x01\n', fourth_delta)
 
448
        di.add_delta_source(fourth_delta, 0)
 
449
        source += fourth_delta
 
450
        # With the next delta, everything should be found
 
451
        fifth_delta = di.make_delta(_fourth_text)
 
452
        self.assertEqual(_fourth_text,
 
453
                         self._gc_module.apply_delta(source, fifth_delta))
 
454
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
455
 
 
456
 
 
457
class TestCopyInstruction(tests.TestCase):
 
458
 
 
459
    def assertEncode(self, expected, offset, length):
 
460
        data = _groupcompress_py.encode_copy_instruction(offset, length)
 
461
        self.assertEqual(expected, data)
 
462
 
 
463
    def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
 
464
        cmd = indexbytes(data, pos)
 
465
        pos += 1
 
466
        out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
 
467
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
468
 
 
469
    def test_encode_no_length(self):
 
470
        self.assertEncode(b'\x80', 0, 64*1024)
 
471
        self.assertEncode(b'\x81\x01', 1, 64*1024)
 
472
        self.assertEncode(b'\x81\x0a', 10, 64*1024)
 
473
        self.assertEncode(b'\x81\xff', 255, 64*1024)
 
474
        self.assertEncode(b'\x82\x01', 256, 64*1024)
 
475
        self.assertEncode(b'\x83\x01\x01', 257, 64*1024)
 
476
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
477
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
478
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
479
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
480
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
481
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
482
 
 
483
    def test_encode_no_offset(self):
 
484
        self.assertEncode(b'\x90\x01', 0, 1)
 
485
        self.assertEncode(b'\x90\x0a', 0, 10)
 
486
        self.assertEncode(b'\x90\xff', 0, 255)
 
487
        self.assertEncode(b'\xA0\x01', 0, 256)
 
488
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
489
        self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
 
490
        # Special case, if copy == 64KiB, then we store exactly 0
 
491
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
492
        # about that, as we would never actually copy 0 bytes
 
493
        self.assertEncode(b'\x80', 0, 64*1024)
 
494
 
 
495
    def test_encode(self):
 
496
        self.assertEncode(b'\x91\x01\x01', 1, 1)
 
497
        self.assertEncode(b'\x91\x09\x0a', 9, 10)
 
498
        self.assertEncode(b'\x91\xfe\xff', 254, 255)
 
499
        self.assertEncode(b'\xA2\x02\x01', 512, 256)
 
500
        self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
 
501
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
502
        # Special case, if copy == 64KiB, then we store exactly 0
 
503
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
504
        # about that, as we would never actually copy 0 bytes
 
505
        self.assertEncode(b'\x81\x0a', 10, 64*1024)
 
506
 
 
507
    def test_decode_no_length(self):
 
508
        # If length is 0, it is interpreted as 64KiB
 
509
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
510
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
511
        self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
 
512
        self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
 
513
        self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
 
514
        self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
 
515
        self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
 
516
        self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
 
517
        self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
 
518
        self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
 
519
        self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
 
520
        self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
 
521
        self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
 
522
 
 
523
    def test_decode_no_offset(self):
 
524
        self.assertDecode(0, 1, 2, b'\x90\x01', 0)
 
525
        self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
 
526
        self.assertDecode(0, 255, 2, b'\x90\xff', 0)
 
527
        self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
 
528
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
529
        self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
 
530
        # Special case, if copy == 64KiB, then we store exactly 0
 
531
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
532
        # about that, as we would never actually copy 0 bytes
 
533
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
534
 
 
535
    def test_decode(self):
 
536
        self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
 
537
        self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
 
538
        self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
 
539
        self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
 
540
        self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
 
541
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
542
 
 
543
    def test_decode_not_start(self):
 
544
        self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
 
545
        self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
 
546
        self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
 
547
 
 
548
 
 
549
class TestBase128Int(tests.TestCase):
 
550
 
 
551
    scenarios = module_scenarios()
 
552
 
 
553
    _gc_module = None # Set by load_tests
 
554
 
 
555
    def assertEqualEncode(self, bytes, val):
 
556
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
557
 
 
558
    def assertEqualDecode(self, val, num_decode, bytes):
 
559
        self.assertEqual((val, num_decode),
 
560
                         self._gc_module.decode_base128_int(bytes))
 
561
 
 
562
    def test_encode(self):
 
563
        self.assertEqualEncode(b'\x01', 1)
 
564
        self.assertEqualEncode(b'\x02', 2)
 
565
        self.assertEqualEncode(b'\x7f', 127)
 
566
        self.assertEqualEncode(b'\x80\x01', 128)
 
567
        self.assertEqualEncode(b'\xff\x01', 255)
 
568
        self.assertEqualEncode(b'\x80\x02', 256)
 
569
        self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
570
 
 
571
    def test_decode(self):
 
572
        self.assertEqualDecode(1, 1, b'\x01')
 
573
        self.assertEqualDecode(2, 1, b'\x02')
 
574
        self.assertEqualDecode(127, 1, b'\x7f')
 
575
        self.assertEqualDecode(128, 2, b'\x80\x01')
 
576
        self.assertEqualDecode(255, 2, b'\xff\x01')
 
577
        self.assertEqualDecode(256, 2, b'\x80\x02')
 
578
        self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
 
579
 
 
580
    def test_decode_with_trailing_bytes(self):
 
581
        self.assertEqualDecode(1, 1, b'\x01abcdef')
 
582
        self.assertEqualDecode(127, 1, b'\x7f\x01')
 
583
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
 
584
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')
 
585
 
 
586