/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2006-04-25 15:05:42 UTC
  • mfrom: (1185.85.85 bzr-encoding)
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060425150542-c7b518dca9928691
[merge] the old bzr-encoding changes, reparenting them on bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for the python and pyrex extensions of groupcompress"""
18
 
 
19
 
import sys
20
 
 
21
 
from .. import (
22
 
    tests,
23
 
    )
24
 
from ..bzr import (
25
 
    _groupcompress_py,
26
 
    )
27
 
from .scenarios import (
28
 
    load_tests_apply_scenarios,
29
 
    )
30
 
from ..sixish import (
31
 
    indexbytes,
32
 
    )
33
 
from . import (
34
 
    features,
35
 
    )
36
 
 
37
 
 
38
 
def module_scenarios():
39
 
    scenarios = [
40
 
        ('python', {'_gc_module': _groupcompress_py}),
41
 
        ]
42
 
    if compiled_groupcompress_feature.available():
43
 
        gc_module = compiled_groupcompress_feature.module
44
 
        scenarios.append(('C',
45
 
                          {'_gc_module': gc_module}))
46
 
    return scenarios
47
 
 
48
 
 
49
 
def two_way_scenarios():
50
 
    scenarios = [
51
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
52
 
                'apply_delta': _groupcompress_py.apply_delta})
53
 
        ]
54
 
    if compiled_groupcompress_feature.available():
55
 
        gc_module = compiled_groupcompress_feature.module
56
 
        scenarios.extend([
57
 
            ('CC', {'make_delta': gc_module.make_delta,
58
 
                    'apply_delta': gc_module.apply_delta}),
59
 
            ('PC', {'make_delta': _groupcompress_py.make_delta,
60
 
                    'apply_delta': gc_module.apply_delta}),
61
 
            ('CP', {'make_delta': gc_module.make_delta,
62
 
                    'apply_delta': _groupcompress_py.apply_delta}),
63
 
            ])
64
 
    return scenarios
65
 
 
66
 
 
67
 
load_tests = load_tests_apply_scenarios
68
 
 
69
 
 
70
 
compiled_groupcompress_feature = features.ModuleAvailableFeature(
71
 
    'breezy.bzr._groupcompress_pyx')
72
 
 
73
 
_text1 = b"""\
74
 
This is a bit
75
 
of source text
76
 
which is meant to be matched
77
 
against other text
78
 
"""
79
 
 
80
 
_text2 = b"""\
81
 
This is a bit
82
 
of source text
83
 
which is meant to differ from
84
 
against other text
85
 
"""
86
 
 
87
 
_text3 = b"""\
88
 
This is a bit
89
 
of source text
90
 
which is meant to be matched
91
 
against other text
92
 
except it also
93
 
has a lot more data
94
 
at the end of the file
95
 
"""
96
 
 
97
 
_first_text = b"""\
98
 
a bit of text, that
99
 
does not have much in
100
 
common with the next text
101
 
"""
102
 
 
103
 
_second_text = b"""\
104
 
some more bit of text, that
105
 
does not have much in
106
 
common with the previous text
107
 
and has some extra text
108
 
"""
109
 
 
110
 
 
111
 
_third_text = b"""\
112
 
a bit of text, that
113
 
has some in common with the previous text
114
 
and has some extra text
115
 
and not have much in
116
 
common with the next text
117
 
"""
118
 
 
119
 
_fourth_text = b"""\
120
 
123456789012345
121
 
same rabin hash
122
 
123456789012345
123
 
same rabin hash
124
 
123456789012345
125
 
same rabin hash
126
 
123456789012345
127
 
same rabin hash
128
 
"""
129
 
 
130
 
 
131
 
class TestMakeAndApplyDelta(tests.TestCase):
132
 
 
133
 
    scenarios = module_scenarios()
134
 
    _gc_module = None  # Set by load_tests
135
 
 
136
 
    def setUp(self):
137
 
        super(TestMakeAndApplyDelta, self).setUp()
138
 
        self.make_delta = self._gc_module.make_delta
139
 
        self.apply_delta = self._gc_module.apply_delta
140
 
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
141
 
 
142
 
    def test_make_delta_is_typesafe(self):
143
 
        self.make_delta(b'a string', b'another string')
144
 
 
145
 
        def _check_make_delta(string1, string2):
146
 
            self.assertRaises(TypeError, self.make_delta, string1, string2)
147
 
 
148
 
        _check_make_delta(b'a string', object())
149
 
        _check_make_delta(b'a string', u'not a string')
150
 
        _check_make_delta(object(), b'a string')
151
 
        _check_make_delta(u'not a string', b'a string')
152
 
 
153
 
    def test_make_noop_delta(self):
154
 
        ident_delta = self.make_delta(_text1, _text1)
155
 
        self.assertEqual(b'M\x90M', ident_delta)
156
 
        ident_delta = self.make_delta(_text2, _text2)
157
 
        self.assertEqual(b'N\x90N', ident_delta)
158
 
        ident_delta = self.make_delta(_text3, _text3)
159
 
        self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
160
 
 
161
 
    def assertDeltaIn(self, delta1, delta2, delta):
162
 
        """Make sure that the delta bytes match one of the expectations."""
163
 
        # In general, the python delta matcher gives different results than the
164
 
        # pyrex delta matcher. Both should be valid deltas, though.
165
 
        if delta not in (delta1, delta2):
166
 
            self.fail(b"Delta bytes:\n"
167
 
                      b"       %r\n"
168
 
                      b"not in %r\n"
169
 
                      b"    or %r"
170
 
                      % (delta, delta1, delta2))
171
 
 
172
 
    def test_make_delta(self):
173
 
        delta = self.make_delta(_text1, _text2)
174
 
        self.assertDeltaIn(
175
 
            b'N\x90/\x1fdiffer from\nagainst other text\n',
176
 
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
177
 
            delta)
178
 
        delta = self.make_delta(_text2, _text1)
179
 
        self.assertDeltaIn(
180
 
            b'M\x90/\x1ebe matched\nagainst other text\n',
181
 
            b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
182
 
            delta)
183
 
        delta = self.make_delta(_text3, _text1)
184
 
        self.assertEqual(b'M\x90M', delta)
185
 
        delta = self.make_delta(_text3, _text2)
186
 
        self.assertDeltaIn(
187
 
            b'N\x90/\x1fdiffer from\nagainst other text\n',
188
 
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
189
 
            delta)
190
 
 
191
 
    def test_make_delta_with_large_copies(self):
192
 
        # We want to have a copy that is larger than 64kB, which forces us to
193
 
        # issue multiple copy instructions.
194
 
        big_text = _text3 * 1220
195
 
        delta = self.make_delta(big_text, big_text)
196
 
        self.assertDeltaIn(
197
 
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
198
 
            b'\x80'              # Copy 64kB, starting at byte 0
199
 
            b'\x84\x01'          # and another 64kB starting at 64kB
200
 
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
201
 
            None,   # Both implementations should be identical
202
 
            delta)
203
 
 
204
 
    def test_apply_delta_is_typesafe(self):
205
 
        self.apply_delta(_text1, b'M\x90M')
206
 
        self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
207
 
        self.assertRaises(TypeError, self.apply_delta,
208
 
                          _text1.decode('latin1'), b'M\x90M')
209
 
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
210
 
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
211
 
 
212
 
    def test_apply_delta(self):
213
 
        target = self.apply_delta(_text1,
214
 
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
215
 
        self.assertEqual(_text2, target)
216
 
        target = self.apply_delta(_text2,
217
 
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
218
 
        self.assertEqual(_text1, target)
219
 
 
220
 
    def test_apply_delta_to_source_is_safe(self):
221
 
        self.assertRaises(TypeError,
222
 
                          self.apply_delta_to_source, object(), 0, 1)
223
 
        self.assertRaises(TypeError,
224
 
                          self.apply_delta_to_source, u'unicode str', 0, 1)
225
 
        # end > length
226
 
        self.assertRaises(ValueError,
227
 
                          self.apply_delta_to_source, b'foo', 1, 4)
228
 
        # start > length
229
 
        self.assertRaises(ValueError,
230
 
                          self.apply_delta_to_source, b'foo', 5, 3)
231
 
        # start > end
232
 
        self.assertRaises(ValueError,
233
 
                          self.apply_delta_to_source, b'foo', 3, 2)
234
 
 
235
 
    def test_apply_delta_to_source(self):
236
 
        source_and_delta = (_text1
237
 
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
238
 
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
239
 
                                                            len(_text1), len(source_and_delta)))
240
 
 
241
 
 
242
 
class TestMakeAndApplyCompatible(tests.TestCase):
243
 
 
244
 
    scenarios = two_way_scenarios()
245
 
 
246
 
    make_delta = None  # Set by load_tests
247
 
    apply_delta = None  # Set by load_tests
248
 
 
249
 
    def assertMakeAndApply(self, source, target):
250
 
        """Assert that generating a delta and applying gives success."""
251
 
        delta = self.make_delta(source, target)
252
 
        bytes = self.apply_delta(source, delta)
253
 
        self.assertEqualDiff(target, bytes)
254
 
 
255
 
    def test_direct(self):
256
 
        self.assertMakeAndApply(_text1, _text2)
257
 
        self.assertMakeAndApply(_text2, _text1)
258
 
        self.assertMakeAndApply(_text1, _text3)
259
 
        self.assertMakeAndApply(_text3, _text1)
260
 
        self.assertMakeAndApply(_text2, _text3)
261
 
        self.assertMakeAndApply(_text3, _text2)
262
 
 
263
 
 
264
 
class TestDeltaIndex(tests.TestCase):
265
 
 
266
 
    def setUp(self):
267
 
        super(TestDeltaIndex, self).setUp()
268
 
        # This test isn't multiplied, because we only have DeltaIndex for the
269
 
        # compiled form
270
 
        # We call this here, because _test_needs_features happens after setUp
271
 
        self.requireFeature(compiled_groupcompress_feature)
272
 
        self._gc_module = compiled_groupcompress_feature.module
273
 
 
274
 
    def test_repr(self):
275
 
        di = self._gc_module.DeltaIndex(b'test text\n')
276
 
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
277
 
 
278
 
    def test_sizeof(self):
279
 
        di = self._gc_module.DeltaIndex()
280
 
        # Exact value will depend on platform but should include sources
281
 
        # source_info is a pointer and two longs so at least 12 bytes
282
 
        lower_bound = di._max_num_sources * 12
283
 
        self.assertGreater(sys.getsizeof(di), lower_bound)
284
 
 
285
 
    def test__dump_no_index(self):
286
 
        di = self._gc_module.DeltaIndex()
287
 
        self.assertEqual(None, di._dump_index())
288
 
 
289
 
    def test__dump_index_simple(self):
290
 
        di = self._gc_module.DeltaIndex()
291
 
        di.add_source(_text1, 0)
292
 
        self.assertFalse(di._has_index())
293
 
        self.assertEqual(None, di._dump_index())
294
 
        _ = di.make_delta(_text1)
295
 
        self.assertTrue(di._has_index())
296
 
        hash_list, entry_list = di._dump_index()
297
 
        self.assertEqual(16, len(hash_list))
298
 
        self.assertEqual(68, len(entry_list))
299
 
        just_entries = [(idx, text_offset, hash_val)
300
 
                        for idx, (text_offset, hash_val)
301
 
                        in enumerate(entry_list)
302
 
                        if text_offset != 0 or hash_val != 0]
303
 
        rabin_hash = self._gc_module._rabin_hash
304
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
305
 
                          (25, 48, rabin_hash(_text1[33:49])),
306
 
                          (34, 32, rabin_hash(_text1[17:33])),
307
 
                          (47, 64, rabin_hash(_text1[49:65])),
308
 
                          ], just_entries)
309
 
        # This ensures that the hash map points to the location we expect it to
310
 
        for entry_idx, text_offset, hash_val in just_entries:
311
 
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
312
 
 
313
 
    def test__dump_index_two_sources(self):
314
 
        di = self._gc_module.DeltaIndex()
315
 
        di.add_source(_text1, 0)
316
 
        di.add_source(_text2, 2)
317
 
        start2 = len(_text1) + 2
318
 
        self.assertTrue(di._has_index())
319
 
        hash_list, entry_list = di._dump_index()
320
 
        self.assertEqual(16, len(hash_list))
321
 
        self.assertEqual(68, len(entry_list))
322
 
        just_entries = [(idx, text_offset, hash_val)
323
 
                        for idx, (text_offset, hash_val)
324
 
                        in enumerate(entry_list)
325
 
                        if text_offset != 0 or hash_val != 0]
326
 
        rabin_hash = self._gc_module._rabin_hash
327
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
328
 
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
329
 
                          (25, 48, rabin_hash(_text1[33:49])),
330
 
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
331
 
                          (34, 32, rabin_hash(_text1[17:33])),
332
 
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
333
 
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
334
 
                          (47, 64, rabin_hash(_text1[49:65])),
335
 
                          ], just_entries)
336
 
        # Each entry should be in the appropriate hash bucket.
337
 
        for entry_idx, text_offset, hash_val in just_entries:
338
 
            hash_idx = hash_val & 0xf
339
 
            self.assertTrue(
340
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
341
 
 
342
 
    def test_first_add_source_doesnt_index_until_make_delta(self):
343
 
        di = self._gc_module.DeltaIndex()
344
 
        self.assertFalse(di._has_index())
345
 
        di.add_source(_text1, 0)
346
 
        self.assertFalse(di._has_index())
347
 
        # However, asking to make a delta will trigger the index to be
348
 
        # generated, and will generate a proper delta
349
 
        delta = di.make_delta(_text2)
350
 
        self.assertTrue(di._has_index())
351
 
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
352
 
 
353
 
    def test_add_source_max_bytes_to_index(self):
354
 
        di = self._gc_module.DeltaIndex()
355
 
        di._max_bytes_to_index = 3 * 16
356
 
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
357
 
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
358
 
        start2 = len(_text1) + 3
359
 
        hash_list, entry_list = di._dump_index()
360
 
        self.assertEqual(16, len(hash_list))
361
 
        self.assertEqual(67, len(entry_list))
362
 
        just_entries = sorted([(text_offset, hash_val)
363
 
                               for text_offset, hash_val in entry_list
364
 
                               if text_offset != 0 or hash_val != 0])
365
 
        rabin_hash = self._gc_module._rabin_hash
366
 
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
367
 
                          (50, rabin_hash(_text1[35:51])),
368
 
                          (75, rabin_hash(_text1[60:76])),
369
 
                          (start2 + 44, rabin_hash(_text3[29:45])),
370
 
                          (start2 + 88, rabin_hash(_text3[73:89])),
371
 
                          (start2 + 132, rabin_hash(_text3[117:133])),
372
 
                          ], just_entries)
373
 
 
374
 
    def test_second_add_source_triggers_make_index(self):
375
 
        di = self._gc_module.DeltaIndex()
376
 
        self.assertFalse(di._has_index())
377
 
        di.add_source(_text1, 0)
378
 
        self.assertFalse(di._has_index())
379
 
        di.add_source(_text2, 0)
380
 
        self.assertTrue(di._has_index())
381
 
 
382
 
    def test_make_delta(self):
383
 
        di = self._gc_module.DeltaIndex(_text1)
384
 
        delta = di.make_delta(_text2)
385
 
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
386
 
 
387
 
    def test_delta_against_multiple_sources(self):
388
 
        di = self._gc_module.DeltaIndex()
389
 
        di.add_source(_first_text, 0)
390
 
        self.assertEqual(len(_first_text), di._source_offset)
391
 
        di.add_source(_second_text, 0)
392
 
        self.assertEqual(len(_first_text) + len(_second_text),
393
 
                         di._source_offset)
394
 
        delta = di.make_delta(_third_text)
395
 
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
396
 
        self.assertEqualDiff(_third_text, result)
397
 
        self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
398
 
                         b'\x91v6\x03and\x91d"\x91:\n', delta)
399
 
 
400
 
    def test_delta_with_offsets(self):
401
 
        di = self._gc_module.DeltaIndex()
402
 
        di.add_source(_first_text, 5)
403
 
        self.assertEqual(len(_first_text) + 5, di._source_offset)
404
 
        di.add_source(_second_text, 10)
405
 
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
406
 
                         di._source_offset)
407
 
        delta = di.make_delta(_third_text)
408
 
        self.assertIsNot(None, delta)
409
 
        result = self._gc_module.apply_delta(
410
 
            b'12345' + _first_text + b'1234567890' + _second_text, delta)
411
 
        self.assertIsNot(None, result)
412
 
        self.assertEqualDiff(_third_text, result)
413
 
        self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
414
 
                         b'\x91\x856\x03and\x91s"\x91?\n', delta)
415
 
 
416
 
    def test_delta_with_delta_bytes(self):
417
 
        di = self._gc_module.DeltaIndex()
418
 
        source = _first_text
419
 
        di.add_source(_first_text, 0)
420
 
        self.assertEqual(len(_first_text), di._source_offset)
421
 
        delta = di.make_delta(_second_text)
422
 
        self.assertEqual(b'h\tsome more\x91\x019'
423
 
                         b'&previous text\nand has some extra text\n', delta)
424
 
        di.add_delta_source(delta, 0)
425
 
        source += delta
426
 
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
427
 
        second_delta = di.make_delta(_third_text)
428
 
        result = self._gc_module.apply_delta(source, second_delta)
429
 
        self.assertEqualDiff(_third_text, result)
430
 
        # We should be able to match against the
431
 
        # 'previous text\nand has some...'  that was part of the delta bytes
432
 
        # Note that we don't match the 'common with the', because it isn't long
433
 
        # enough to match in the original text, and those bytes are not present
434
 
        # in the delta for the second text.
435
 
        self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
436
 
                         b'\x91S&\x03and\x91\x18,', second_delta)
437
 
        # Add this delta, and create a new delta for the same text. We should
438
 
        # find the remaining text, and only insert the short 'and' text.
439
 
        di.add_delta_source(second_delta, 0)
440
 
        source += second_delta
441
 
        third_delta = di.make_delta(_third_text)
442
 
        result = self._gc_module.apply_delta(source, third_delta)
443
 
        self.assertEqualDiff(_third_text, result)
444
 
        self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
445
 
                         b'\x91S&\x03and\x91\x18,', third_delta)
446
 
        # Now create a delta, which we know won't be able to be 'fit' into the
447
 
        # existing index
448
 
        fourth_delta = di.make_delta(_fourth_text)
449
 
        self.assertEqual(_fourth_text,
450
 
                         self._gc_module.apply_delta(source, fourth_delta))
451
 
        self.assertEqual(b'\x80\x01'
452
 
                         b'\x7f123456789012345\nsame rabin hash\n'
453
 
                         b'123456789012345\nsame rabin hash\n'
454
 
                         b'123456789012345\nsame rabin hash\n'
455
 
                         b'123456789012345\nsame rabin hash'
456
 
                         b'\x01\n', fourth_delta)
457
 
        di.add_delta_source(fourth_delta, 0)
458
 
        source += fourth_delta
459
 
        # With the next delta, everything should be found
460
 
        fifth_delta = di.make_delta(_fourth_text)
461
 
        self.assertEqual(_fourth_text,
462
 
                         self._gc_module.apply_delta(source, fifth_delta))
463
 
        self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
464
 
 
465
 
 
466
 
class TestCopyInstruction(tests.TestCase):
467
 
 
468
 
    def assertEncode(self, expected, offset, length):
469
 
        data = _groupcompress_py.encode_copy_instruction(offset, length)
470
 
        self.assertEqual(expected, data)
471
 
 
472
 
    def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
473
 
        cmd = indexbytes(data, pos)
474
 
        pos += 1
475
 
        out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
476
 
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
477
 
 
478
 
    def test_encode_no_length(self):
479
 
        self.assertEncode(b'\x80', 0, 64 * 1024)
480
 
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
481
 
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
482
 
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
483
 
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
484
 
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
485
 
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
486
 
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
487
 
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
488
 
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
489
 
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
490
 
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
491
 
 
492
 
    def test_encode_no_offset(self):
493
 
        self.assertEncode(b'\x90\x01', 0, 1)
494
 
        self.assertEncode(b'\x90\x0a', 0, 10)
495
 
        self.assertEncode(b'\x90\xff', 0, 255)
496
 
        self.assertEncode(b'\xA0\x01', 0, 256)
497
 
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
498
 
        self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
499
 
        # Special case, if copy == 64KiB, then we store exactly 0
500
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
501
 
        # about that, as we would never actually copy 0 bytes
502
 
        self.assertEncode(b'\x80', 0, 64 * 1024)
503
 
 
504
 
    def test_encode(self):
505
 
        self.assertEncode(b'\x91\x01\x01', 1, 1)
506
 
        self.assertEncode(b'\x91\x09\x0a', 9, 10)
507
 
        self.assertEncode(b'\x91\xfe\xff', 254, 255)
508
 
        self.assertEncode(b'\xA2\x02\x01', 512, 256)
509
 
        self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
510
 
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
511
 
        # Special case, if copy == 64KiB, then we store exactly 0
512
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
513
 
        # about that, as we would never actually copy 0 bytes
514
 
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
515
 
 
516
 
    def test_decode_no_length(self):
517
 
        # If length is 0, it is interpreted as 64KiB
518
 
        # The shortest possible instruction is a copy of 64KiB from offset 0
519
 
        self.assertDecode(0, 65536, 1, b'\x80', 0)
520
 
        self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
521
 
        self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
522
 
        self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
523
 
        self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
524
 
        self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
525
 
        self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
526
 
        self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
527
 
        self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
528
 
        self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
529
 
        self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
530
 
        self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
531
 
 
532
 
    def test_decode_no_offset(self):
533
 
        self.assertDecode(0, 1, 2, b'\x90\x01', 0)
534
 
        self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
535
 
        self.assertDecode(0, 255, 2, b'\x90\xff', 0)
536
 
        self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
537
 
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
538
 
        self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
539
 
        # Special case, if copy == 64KiB, then we store exactly 0
540
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
541
 
        # about that, as we would never actually copy 0 bytes
542
 
        self.assertDecode(0, 65536, 1, b'\x80', 0)
543
 
 
544
 
    def test_decode(self):
545
 
        self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
546
 
        self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
547
 
        self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
548
 
        self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
549
 
        self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
550
 
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
551
 
 
552
 
    def test_decode_not_start(self):
553
 
        self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
554
 
        self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
555
 
        self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
556
 
 
557
 
 
558
 
class TestBase128Int(tests.TestCase):
559
 
 
560
 
    scenarios = module_scenarios()
561
 
 
562
 
    _gc_module = None  # Set by load_tests
563
 
 
564
 
    def assertEqualEncode(self, bytes, val):
565
 
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
566
 
 
567
 
    def assertEqualDecode(self, val, num_decode, bytes):
568
 
        self.assertEqual((val, num_decode),
569
 
                         self._gc_module.decode_base128_int(bytes))
570
 
 
571
 
    def test_encode(self):
572
 
        self.assertEqualEncode(b'\x01', 1)
573
 
        self.assertEqualEncode(b'\x02', 2)
574
 
        self.assertEqualEncode(b'\x7f', 127)
575
 
        self.assertEqualEncode(b'\x80\x01', 128)
576
 
        self.assertEqualEncode(b'\xff\x01', 255)
577
 
        self.assertEqualEncode(b'\x80\x02', 256)
578
 
        self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
579
 
 
580
 
    def test_decode(self):
581
 
        self.assertEqualDecode(1, 1, b'\x01')
582
 
        self.assertEqualDecode(2, 1, b'\x02')
583
 
        self.assertEqualDecode(127, 1, b'\x7f')
584
 
        self.assertEqualDecode(128, 2, b'\x80\x01')
585
 
        self.assertEqualDecode(255, 2, b'\xff\x01')
586
 
        self.assertEqualDecode(256, 2, b'\x80\x02')
587
 
        self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
588
 
 
589
 
    def test_decode_with_trailing_bytes(self):
590
 
        self.assertEqualDecode(1, 1, b'\x01abcdef')
591
 
        self.assertEqualDecode(127, 1, b'\x7f\x01')
592
 
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
593
 
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')