/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-10 16:40:42 UTC
  • mfrom: (6653.6.7 rename-controldir)
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170610164042-zrxqgy2htyduvke2
MergeĀ rename-controldirĀ branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
from .. import (
 
20
    tests,
 
21
    )
 
22
from ..bzr import (
 
23
    _groupcompress_py,
 
24
    )
 
25
from .scenarios import (
 
26
    load_tests_apply_scenarios,
 
27
    )
 
28
from . import (
 
29
    features,
 
30
    )
 
31
 
 
32
 
 
33
def module_scenarios():
 
34
    scenarios = [
 
35
        ('python', {'_gc_module': _groupcompress_py}),
 
36
        ]
 
37
    if compiled_groupcompress_feature.available():
 
38
        gc_module = compiled_groupcompress_feature.module
 
39
        scenarios.append(('C',
 
40
            {'_gc_module': gc_module}))
 
41
    return scenarios
 
42
 
 
43
 
 
44
def two_way_scenarios():
 
45
    scenarios = [
 
46
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
47
                'apply_delta': _groupcompress_py.apply_delta})
 
48
        ]
 
49
    if compiled_groupcompress_feature.available():
 
50
        gc_module = compiled_groupcompress_feature.module
 
51
        scenarios.extend([
 
52
            ('CC', {'make_delta': gc_module.make_delta,
 
53
                    'apply_delta': gc_module.apply_delta}),
 
54
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
55
                    'apply_delta': gc_module.apply_delta}),
 
56
            ('CP', {'make_delta': gc_module.make_delta,
 
57
                    'apply_delta': _groupcompress_py.apply_delta}),
 
58
            ])
 
59
    return scenarios
 
60
 
 
61
 
 
62
load_tests = load_tests_apply_scenarios
 
63
 
 
64
 
 
65
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
66
    'breezy.bzr._groupcompress_pyx')
 
67
 
 
68
_text1 = """\
 
69
This is a bit
 
70
of source text
 
71
which is meant to be matched
 
72
against other text
 
73
"""
 
74
 
 
75
_text2 = """\
 
76
This is a bit
 
77
of source text
 
78
which is meant to differ from
 
79
against other text
 
80
"""
 
81
 
 
82
_text3 = """\
 
83
This is a bit
 
84
of source text
 
85
which is meant to be matched
 
86
against other text
 
87
except it also
 
88
has a lot more data
 
89
at the end of the file
 
90
"""
 
91
 
 
92
_first_text = """\
 
93
a bit of text, that
 
94
does not have much in
 
95
common with the next text
 
96
"""
 
97
 
 
98
_second_text = """\
 
99
some more bit of text, that
 
100
does not have much in
 
101
common with the previous text
 
102
and has some extra text
 
103
"""
 
104
 
 
105
 
 
106
_third_text = """\
 
107
a bit of text, that
 
108
has some in common with the previous text
 
109
and has some extra text
 
110
and not have much in
 
111
common with the next text
 
112
"""
 
113
 
 
114
_fourth_text = """\
 
115
123456789012345
 
116
same rabin hash
 
117
123456789012345
 
118
same rabin hash
 
119
123456789012345
 
120
same rabin hash
 
121
123456789012345
 
122
same rabin hash
 
123
"""
 
124
 
 
125
class TestMakeAndApplyDelta(tests.TestCase):
 
126
 
 
127
    scenarios = module_scenarios()
 
128
    _gc_module = None # Set by load_tests
 
129
 
 
130
    def setUp(self):
 
131
        super(TestMakeAndApplyDelta, self).setUp()
 
132
        self.make_delta = self._gc_module.make_delta
 
133
        self.apply_delta = self._gc_module.apply_delta
 
134
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
135
 
 
136
    def test_make_delta_is_typesafe(self):
 
137
        self.make_delta('a string', 'another string')
 
138
 
 
139
        def _check_make_delta(string1, string2):
 
140
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
141
 
 
142
        _check_make_delta('a string', object())
 
143
        _check_make_delta('a string', u'not a string')
 
144
        _check_make_delta(object(), 'a string')
 
145
        _check_make_delta(u'not a string', 'a string')
 
146
 
 
147
    def test_make_noop_delta(self):
 
148
        ident_delta = self.make_delta(_text1, _text1)
 
149
        self.assertEqual('M\x90M', ident_delta)
 
150
        ident_delta = self.make_delta(_text2, _text2)
 
151
        self.assertEqual('N\x90N', ident_delta)
 
152
        ident_delta = self.make_delta(_text3, _text3)
 
153
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
 
154
 
 
155
    def assertDeltaIn(self, delta1, delta2, delta):
 
156
        """Make sure that the delta bytes match one of the expectations."""
 
157
        # In general, the python delta matcher gives different results than the
 
158
        # pyrex delta matcher. Both should be valid deltas, though.
 
159
        if delta not in (delta1, delta2):
 
160
            self.fail("Delta bytes:\n"
 
161
                      "       %r\n"
 
162
                      "not in %r\n"
 
163
                      "    or %r"
 
164
                      % (delta, delta1, delta2))
 
165
 
 
166
    def test_make_delta(self):
 
167
        delta = self.make_delta(_text1, _text2)
 
168
        self.assertDeltaIn(
 
169
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
170
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
171
            delta)
 
172
        delta = self.make_delta(_text2, _text1)
 
173
        self.assertDeltaIn(
 
174
            'M\x90/\x1ebe matched\nagainst other text\n',
 
175
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
176
            delta)
 
177
        delta = self.make_delta(_text3, _text1)
 
178
        self.assertEqual('M\x90M', delta)
 
179
        delta = self.make_delta(_text3, _text2)
 
180
        self.assertDeltaIn(
 
181
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
182
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
183
            delta)
 
184
 
 
185
    def test_make_delta_with_large_copies(self):
 
186
        # We want to have a copy that is larger than 64kB, which forces us to
 
187
        # issue multiple copy instructions.
 
188
        big_text = _text3 * 1220
 
189
        delta = self.make_delta(big_text, big_text)
 
190
        self.assertDeltaIn(
 
191
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
192
            '\x80'              # Copy 64kB, starting at byte 0
 
193
            '\x84\x01'          # and another 64kB starting at 64kB
 
194
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
195
            None,   # Both implementations should be identical
 
196
            delta)
 
197
 
 
198
    def test_apply_delta_is_typesafe(self):
 
199
        self.apply_delta(_text1, 'M\x90M')
 
200
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
 
201
        self.assertRaises(TypeError, self.apply_delta,
 
202
                          unicode(_text1), 'M\x90M')
 
203
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
204
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
205
 
 
206
    def test_apply_delta(self):
 
207
        target = self.apply_delta(_text1,
 
208
                    'N\x90/\x1fdiffer from\nagainst other text\n')
 
209
        self.assertEqual(_text2, target)
 
210
        target = self.apply_delta(_text2,
 
211
                    'M\x90/\x1ebe matched\nagainst other text\n')
 
212
        self.assertEqual(_text1, target)
 
213
 
 
214
    def test_apply_delta_to_source_is_safe(self):
 
215
        self.assertRaises(TypeError,
 
216
            self.apply_delta_to_source, object(), 0, 1)
 
217
        self.assertRaises(TypeError,
 
218
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
219
        # end > length
 
220
        self.assertRaises(ValueError,
 
221
            self.apply_delta_to_source, 'foo', 1, 4)
 
222
        # start > length
 
223
        self.assertRaises(ValueError,
 
224
            self.apply_delta_to_source, 'foo', 5, 3)
 
225
        # start > end
 
226
        self.assertRaises(ValueError,
 
227
            self.apply_delta_to_source, 'foo', 3, 2)
 
228
 
 
229
    def test_apply_delta_to_source(self):
 
230
        source_and_delta = (_text1
 
231
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
 
232
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
233
                                    len(_text1), len(source_and_delta)))
 
234
 
 
235
 
 
236
class TestMakeAndApplyCompatible(tests.TestCase):
 
237
 
 
238
    scenarios = two_way_scenarios()
 
239
 
 
240
    make_delta = None # Set by load_tests
 
241
    apply_delta = None # Set by load_tests
 
242
 
 
243
    def assertMakeAndApply(self, source, target):
 
244
        """Assert that generating a delta and applying gives success."""
 
245
        delta = self.make_delta(source, target)
 
246
        bytes = self.apply_delta(source, delta)
 
247
        self.assertEqualDiff(target, bytes)
 
248
 
 
249
    def test_direct(self):
 
250
        self.assertMakeAndApply(_text1, _text2)
 
251
        self.assertMakeAndApply(_text2, _text1)
 
252
        self.assertMakeAndApply(_text1, _text3)
 
253
        self.assertMakeAndApply(_text3, _text1)
 
254
        self.assertMakeAndApply(_text2, _text3)
 
255
        self.assertMakeAndApply(_text3, _text2)
 
256
 
 
257
 
 
258
class TestDeltaIndex(tests.TestCase):
 
259
 
 
260
    def setUp(self):
 
261
        super(TestDeltaIndex, self).setUp()
 
262
        # This test isn't multiplied, because we only have DeltaIndex for the
 
263
        # compiled form
 
264
        # We call this here, because _test_needs_features happens after setUp
 
265
        self.requireFeature(compiled_groupcompress_feature)
 
266
        self._gc_module = compiled_groupcompress_feature.module
 
267
 
 
268
    def test_repr(self):
 
269
        di = self._gc_module.DeltaIndex('test text\n')
 
270
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
271
 
 
272
    def test__dump_no_index(self):
 
273
        di = self._gc_module.DeltaIndex()
 
274
        self.assertEqual(None, di._dump_index())
 
275
 
 
276
    def test__dump_index_simple(self):
 
277
        di = self._gc_module.DeltaIndex()
 
278
        di.add_source(_text1, 0)
 
279
        self.assertFalse(di._has_index())
 
280
        self.assertEqual(None, di._dump_index())
 
281
        _ = di.make_delta(_text1)
 
282
        self.assertTrue(di._has_index())
 
283
        hash_list, entry_list = di._dump_index()
 
284
        self.assertEqual(16, len(hash_list))
 
285
        self.assertEqual(68, len(entry_list))
 
286
        just_entries = [(idx, text_offset, hash_val)
 
287
                        for idx, (text_offset, hash_val)
 
288
                         in enumerate(entry_list)
 
289
                         if text_offset != 0 or hash_val != 0]
 
290
        rabin_hash = self._gc_module._rabin_hash
 
291
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
292
                          (25, 48, rabin_hash(_text1[33:49])),
 
293
                          (34, 32, rabin_hash(_text1[17:33])),
 
294
                          (47, 64, rabin_hash(_text1[49:65])),
 
295
                         ], just_entries)
 
296
        # This ensures that the hash map points to the location we expect it to
 
297
        for entry_idx, text_offset, hash_val in just_entries:
 
298
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
299
 
 
300
    def test__dump_index_two_sources(self):
 
301
        di = self._gc_module.DeltaIndex()
 
302
        di.add_source(_text1, 0)
 
303
        di.add_source(_text2, 2)
 
304
        start2 = len(_text1) + 2
 
305
        self.assertTrue(di._has_index())
 
306
        hash_list, entry_list = di._dump_index()
 
307
        self.assertEqual(16, len(hash_list))
 
308
        self.assertEqual(68, len(entry_list))
 
309
        just_entries = [(idx, text_offset, hash_val)
 
310
                        for idx, (text_offset, hash_val)
 
311
                         in enumerate(entry_list)
 
312
                         if text_offset != 0 or hash_val != 0]
 
313
        rabin_hash = self._gc_module._rabin_hash
 
314
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
315
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
316
                          (25, 48, rabin_hash(_text1[33:49])),
 
317
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
318
                          (34, 32, rabin_hash(_text1[17:33])),
 
319
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
320
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
321
                          (47, 64, rabin_hash(_text1[49:65])),
 
322
                         ], just_entries)
 
323
        # Each entry should be in the appropriate hash bucket.
 
324
        for entry_idx, text_offset, hash_val in just_entries:
 
325
            hash_idx = hash_val & 0xf
 
326
            self.assertTrue(
 
327
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
328
 
 
329
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
330
        di = self._gc_module.DeltaIndex()
 
331
        self.assertFalse(di._has_index())
 
332
        di.add_source(_text1, 0)
 
333
        self.assertFalse(di._has_index())
 
334
        # However, asking to make a delta will trigger the index to be
 
335
        # generated, and will generate a proper delta
 
336
        delta = di.make_delta(_text2)
 
337
        self.assertTrue(di._has_index())
 
338
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
339
 
 
340
    def test_add_source_max_bytes_to_index(self):
 
341
        di = self._gc_module.DeltaIndex()
 
342
        di._max_bytes_to_index = 3*16
 
343
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
344
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
345
        start2 = len(_text1) + 3
 
346
        hash_list, entry_list = di._dump_index()
 
347
        self.assertEqual(16, len(hash_list))
 
348
        self.assertEqual(67, len(entry_list))
 
349
        just_entries = sorted([(text_offset, hash_val)
 
350
                               for text_offset, hash_val in entry_list
 
351
                                if text_offset != 0 or hash_val != 0])
 
352
        rabin_hash = self._gc_module._rabin_hash
 
353
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
354
                          (50, rabin_hash(_text1[35:51])),
 
355
                          (75, rabin_hash(_text1[60:76])),
 
356
                          (start2+44, rabin_hash(_text3[29:45])),
 
357
                          (start2+88, rabin_hash(_text3[73:89])),
 
358
                          (start2+132, rabin_hash(_text3[117:133])),
 
359
                         ], just_entries)
 
360
 
 
361
    def test_second_add_source_triggers_make_index(self):
 
362
        di = self._gc_module.DeltaIndex()
 
363
        self.assertFalse(di._has_index())
 
364
        di.add_source(_text1, 0)
 
365
        self.assertFalse(di._has_index())
 
366
        di.add_source(_text2, 0)
 
367
        self.assertTrue(di._has_index())
 
368
 
 
369
    def test_make_delta(self):
 
370
        di = self._gc_module.DeltaIndex(_text1)
 
371
        delta = di.make_delta(_text2)
 
372
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
373
 
 
374
    def test_delta_against_multiple_sources(self):
 
375
        di = self._gc_module.DeltaIndex()
 
376
        di.add_source(_first_text, 0)
 
377
        self.assertEqual(len(_first_text), di._source_offset)
 
378
        di.add_source(_second_text, 0)
 
379
        self.assertEqual(len(_first_text) + len(_second_text),
 
380
                         di._source_offset)
 
381
        delta = di.make_delta(_third_text)
 
382
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
383
        self.assertEqualDiff(_third_text, result)
 
384
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
385
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
386
 
 
387
    def test_delta_with_offsets(self):
 
388
        di = self._gc_module.DeltaIndex()
 
389
        di.add_source(_first_text, 5)
 
390
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
391
        di.add_source(_second_text, 10)
 
392
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
393
                         di._source_offset)
 
394
        delta = di.make_delta(_third_text)
 
395
        self.assertIsNot(None, delta)
 
396
        result = self._gc_module.apply_delta(
 
397
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
398
        self.assertIsNot(None, result)
 
399
        self.assertEqualDiff(_third_text, result)
 
400
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
401
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
402
 
 
403
    def test_delta_with_delta_bytes(self):
 
404
        di = self._gc_module.DeltaIndex()
 
405
        source = _first_text
 
406
        di.add_source(_first_text, 0)
 
407
        self.assertEqual(len(_first_text), di._source_offset)
 
408
        delta = di.make_delta(_second_text)
 
409
        self.assertEqual('h\tsome more\x91\x019'
 
410
                         '&previous text\nand has some extra text\n', delta)
 
411
        di.add_delta_source(delta, 0)
 
412
        source += delta
 
413
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
414
        second_delta = di.make_delta(_third_text)
 
415
        result = self._gc_module.apply_delta(source, second_delta)
 
416
        self.assertEqualDiff(_third_text, result)
 
417
        # We should be able to match against the
 
418
        # 'previous text\nand has some...'  that was part of the delta bytes
 
419
        # Note that we don't match the 'common with the', because it isn't long
 
420
        # enough to match in the original text, and those bytes are not present
 
421
        # in the delta for the second text.
 
422
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
423
                         '\x91S&\x03and\x91\x18,', second_delta)
 
424
        # Add this delta, and create a new delta for the same text. We should
 
425
        # find the remaining text, and only insert the short 'and' text.
 
426
        di.add_delta_source(second_delta, 0)
 
427
        source += second_delta
 
428
        third_delta = di.make_delta(_third_text)
 
429
        result = self._gc_module.apply_delta(source, third_delta)
 
430
        self.assertEqualDiff(_third_text, result)
 
431
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
432
                         '\x91S&\x03and\x91\x18,', third_delta)
 
433
        # Now create a delta, which we know won't be able to be 'fit' into the
 
434
        # existing index
 
435
        fourth_delta = di.make_delta(_fourth_text)
 
436
        self.assertEqual(_fourth_text,
 
437
                         self._gc_module.apply_delta(source, fourth_delta))
 
438
        self.assertEqual('\x80\x01'
 
439
                         '\x7f123456789012345\nsame rabin hash\n'
 
440
                         '123456789012345\nsame rabin hash\n'
 
441
                         '123456789012345\nsame rabin hash\n'
 
442
                         '123456789012345\nsame rabin hash'
 
443
                         '\x01\n', fourth_delta)
 
444
        di.add_delta_source(fourth_delta, 0)
 
445
        source += fourth_delta
 
446
        # With the next delta, everything should be found
 
447
        fifth_delta = di.make_delta(_fourth_text)
 
448
        self.assertEqual(_fourth_text,
 
449
                         self._gc_module.apply_delta(source, fifth_delta))
 
450
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
451
 
 
452
 
 
453
class TestCopyInstruction(tests.TestCase):
 
454
 
 
455
    def assertEncode(self, expected, offset, length):
 
456
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
 
457
        if expected != bytes:
 
458
            self.assertEqual([hex(ord(e)) for e in expected],
 
459
                             [hex(ord(b)) for b in bytes])
 
460
 
 
461
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
 
462
        cmd = ord(bytes[pos])
 
463
        pos += 1
 
464
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
 
465
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
466
 
 
467
    def test_encode_no_length(self):
 
468
        self.assertEncode('\x80', 0, 64*1024)
 
469
        self.assertEncode('\x81\x01', 1, 64*1024)
 
470
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
471
        self.assertEncode('\x81\xff', 255, 64*1024)
 
472
        self.assertEncode('\x82\x01', 256, 64*1024)
 
473
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
 
474
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
475
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
476
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
477
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
478
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
479
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
480
 
 
481
    def test_encode_no_offset(self):
 
482
        self.assertEncode('\x90\x01', 0, 1)
 
483
        self.assertEncode('\x90\x0a', 0, 10)
 
484
        self.assertEncode('\x90\xff', 0, 255)
 
485
        self.assertEncode('\xA0\x01', 0, 256)
 
486
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
487
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
 
488
        # Special case, if copy == 64KiB, then we store exactly 0
 
489
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
490
        # about that, as we would never actually copy 0 bytes
 
491
        self.assertEncode('\x80', 0, 64*1024)
 
492
 
 
493
    def test_encode(self):
 
494
        self.assertEncode('\x91\x01\x01', 1, 1)
 
495
        self.assertEncode('\x91\x09\x0a', 9, 10)
 
496
        self.assertEncode('\x91\xfe\xff', 254, 255)
 
497
        self.assertEncode('\xA2\x02\x01', 512, 256)
 
498
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
 
499
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
500
        # Special case, if copy == 64KiB, then we store exactly 0
 
501
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
502
        # about that, as we would never actually copy 0 bytes
 
503
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
504
 
 
505
    def test_decode_no_length(self):
 
506
        # If length is 0, it is interpreted as 64KiB
 
507
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
508
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
509
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
 
510
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
 
511
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
 
512
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
 
513
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
 
514
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
 
515
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
 
516
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
 
517
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
 
518
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
 
519
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
 
520
 
 
521
    def test_decode_no_offset(self):
 
522
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
 
523
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
 
524
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
 
525
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
 
526
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
527
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
 
528
        # Special case, if copy == 64KiB, then we store exactly 0
 
529
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
530
        # about that, as we would never actually copy 0 bytes
 
531
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
532
 
 
533
    def test_decode(self):
 
534
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
 
535
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
 
536
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
 
537
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
 
538
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
 
539
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
540
 
 
541
    def test_decode_not_start(self):
 
542
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
 
543
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
 
544
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
 
545
 
 
546
 
 
547
class TestBase128Int(tests.TestCase):
 
548
 
 
549
    scenarios = module_scenarios()
 
550
 
 
551
    _gc_module = None # Set by load_tests
 
552
 
 
553
    def assertEqualEncode(self, bytes, val):
 
554
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
555
 
 
556
    def assertEqualDecode(self, val, num_decode, bytes):
 
557
        self.assertEqual((val, num_decode),
 
558
                         self._gc_module.decode_base128_int(bytes))
 
559
 
 
560
    def test_encode(self):
 
561
        self.assertEqualEncode('\x01', 1)
 
562
        self.assertEqualEncode('\x02', 2)
 
563
        self.assertEqualEncode('\x7f', 127)
 
564
        self.assertEqualEncode('\x80\x01', 128)
 
565
        self.assertEqualEncode('\xff\x01', 255)
 
566
        self.assertEqualEncode('\x80\x02', 256)
 
567
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
568
 
 
569
    def test_decode(self):
 
570
        self.assertEqualDecode(1, 1, '\x01')
 
571
        self.assertEqualDecode(2, 1, '\x02')
 
572
        self.assertEqualDecode(127, 1, '\x7f')
 
573
        self.assertEqualDecode(128, 2, '\x80\x01')
 
574
        self.assertEqualDecode(255, 2, '\xff\x01')
 
575
        self.assertEqualDecode(256, 2, '\x80\x02')
 
576
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
577
 
 
578
    def test_decode_with_trailing_bytes(self):
 
579
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
580
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
581
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
582
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
583
 
 
584