/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Robert Collins
  • Date: 2010-05-05 00:05:29 UTC
  • mto: This revision was merged to the branch mainline in revision 5206.
  • Revision ID: robertc@robertcollins.net-20100505000529-ltmllyms5watqj5u
Make 'pydoc bzrlib.tests.build_tree_shape' useful.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
 
import sys
20
 
 
21
 
from .. import (
 
19
from bzrlib import (
 
20
    groupcompress,
 
21
    _groupcompress_py,
22
22
    tests,
23
23
    )
24
 
from ..bzr import (
25
 
    _groupcompress_py,
26
 
    )
27
 
from .scenarios import (
28
 
    load_tests_apply_scenarios,
29
 
    )
30
 
from ..sixish import (
31
 
    indexbytes,
32
 
    )
33
 
from . import (
34
 
    features,
35
 
    )
36
 
 
37
 
 
38
 
def module_scenarios():
 
24
 
 
25
 
 
26
def load_tests(standard_tests, module, loader):
 
27
    """Parameterize tests for all versions of groupcompress."""
 
28
    two_way_scenarios = [
 
29
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
30
                'apply_delta': _groupcompress_py.apply_delta})
 
31
        ]
39
32
    scenarios = [
40
33
        ('python', {'_gc_module': _groupcompress_py}),
41
34
        ]
42
35
    if compiled_groupcompress_feature.available():
43
36
        gc_module = compiled_groupcompress_feature.module
44
37
        scenarios.append(('C',
45
 
                          {'_gc_module': gc_module}))
46
 
    return scenarios
47
 
 
48
 
 
49
 
def two_way_scenarios():
50
 
    scenarios = [
51
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
52
 
                'apply_delta': _groupcompress_py.apply_delta})
53
 
        ]
54
 
    if compiled_groupcompress_feature.available():
55
 
        gc_module = compiled_groupcompress_feature.module
56
 
        scenarios.extend([
 
38
            {'_gc_module': gc_module}))
 
39
        two_way_scenarios.extend([
57
40
            ('CC', {'make_delta': gc_module.make_delta,
58
41
                    'apply_delta': gc_module.apply_delta}),
59
42
            ('PC', {'make_delta': _groupcompress_py.make_delta,
61
44
            ('CP', {'make_delta': gc_module.make_delta,
62
45
                    'apply_delta': _groupcompress_py.apply_delta}),
63
46
            ])
64
 
    return scenarios
65
 
 
66
 
 
67
 
load_tests = load_tests_apply_scenarios
68
 
 
69
 
 
70
 
compiled_groupcompress_feature = features.ModuleAvailableFeature(
71
 
    'breezy.bzr._groupcompress_pyx')
72
 
 
73
 
_text1 = b"""\
 
47
    to_adapt, result = tests.split_suite_by_condition(
 
48
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
 
49
                                                    TestBase128Int)))
 
50
    result = tests.multiply_tests(to_adapt, scenarios, result)
 
51
    to_adapt, result = tests.split_suite_by_condition(result,
 
52
        tests.condition_isinstance(TestMakeAndApplyCompatible))
 
53
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
 
54
    return result
 
55
 
 
56
 
 
57
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
 
58
                                    'bzrlib._groupcompress_pyx')
 
59
 
 
60
_text1 = """\
74
61
This is a bit
75
62
of source text
76
63
which is meant to be matched
77
64
against other text
78
65
"""
79
66
 
80
 
_text2 = b"""\
 
67
_text2 = """\
81
68
This is a bit
82
69
of source text
83
70
which is meant to differ from
84
71
against other text
85
72
"""
86
73
 
87
 
_text3 = b"""\
 
74
_text3 = """\
88
75
This is a bit
89
76
of source text
90
77
which is meant to be matched
94
81
at the end of the file
95
82
"""
96
83
 
97
 
_first_text = b"""\
 
84
_first_text = """\
98
85
a bit of text, that
99
86
does not have much in
100
87
common with the next text
101
88
"""
102
89
 
103
 
_second_text = b"""\
 
90
_second_text = """\
104
91
some more bit of text, that
105
92
does not have much in
106
93
common with the previous text
108
95
"""
109
96
 
110
97
 
111
 
_third_text = b"""\
 
98
_third_text = """\
112
99
a bit of text, that
113
100
has some in common with the previous text
114
101
and has some extra text
116
103
common with the next text
117
104
"""
118
105
 
119
 
_fourth_text = b"""\
 
106
_fourth_text = """\
120
107
123456789012345
121
108
same rabin hash
122
109
123456789012345
127
114
same rabin hash
128
115
"""
129
116
 
130
 
 
131
117
class TestMakeAndApplyDelta(tests.TestCase):
132
118
 
133
 
    scenarios = module_scenarios()
134
 
    _gc_module = None  # Set by load_tests
 
119
    _gc_module = None # Set by load_tests
135
120
 
136
121
    def setUp(self):
137
122
        super(TestMakeAndApplyDelta, self).setUp()
140
125
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
141
126
 
142
127
    def test_make_delta_is_typesafe(self):
143
 
        self.make_delta(b'a string', b'another string')
 
128
        self.make_delta('a string', 'another string')
144
129
 
145
130
        def _check_make_delta(string1, string2):
146
131
            self.assertRaises(TypeError, self.make_delta, string1, string2)
147
132
 
148
 
        _check_make_delta(b'a string', object())
149
 
        _check_make_delta(b'a string', u'not a string')
150
 
        _check_make_delta(object(), b'a string')
151
 
        _check_make_delta(u'not a string', b'a string')
 
133
        _check_make_delta('a string', object())
 
134
        _check_make_delta('a string', u'not a string')
 
135
        _check_make_delta(object(), 'a string')
 
136
        _check_make_delta(u'not a string', 'a string')
152
137
 
153
138
    def test_make_noop_delta(self):
154
139
        ident_delta = self.make_delta(_text1, _text1)
155
 
        self.assertEqual(b'M\x90M', ident_delta)
 
140
        self.assertEqual('M\x90M', ident_delta)
156
141
        ident_delta = self.make_delta(_text2, _text2)
157
 
        self.assertEqual(b'N\x90N', ident_delta)
 
142
        self.assertEqual('N\x90N', ident_delta)
158
143
        ident_delta = self.make_delta(_text3, _text3)
159
 
        self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
 
144
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
160
145
 
161
146
    def assertDeltaIn(self, delta1, delta2, delta):
162
147
        """Make sure that the delta bytes match one of the expectations."""
163
148
        # In general, the python delta matcher gives different results than the
164
149
        # pyrex delta matcher. Both should be valid deltas, though.
165
150
        if delta not in (delta1, delta2):
166
 
            self.fail(b"Delta bytes:\n"
167
 
                      b"       %r\n"
168
 
                      b"not in %r\n"
169
 
                      b"    or %r"
 
151
            self.fail("Delta bytes:\n"
 
152
                      "       %r\n"
 
153
                      "not in %r\n"
 
154
                      "    or %r"
170
155
                      % (delta, delta1, delta2))
171
156
 
172
157
    def test_make_delta(self):
173
158
        delta = self.make_delta(_text1, _text2)
174
159
        self.assertDeltaIn(
175
 
            b'N\x90/\x1fdiffer from\nagainst other text\n',
176
 
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
160
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
161
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
177
162
            delta)
178
163
        delta = self.make_delta(_text2, _text1)
179
164
        self.assertDeltaIn(
180
 
            b'M\x90/\x1ebe matched\nagainst other text\n',
181
 
            b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
165
            'M\x90/\x1ebe matched\nagainst other text\n',
 
166
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
182
167
            delta)
183
168
        delta = self.make_delta(_text3, _text1)
184
 
        self.assertEqual(b'M\x90M', delta)
 
169
        self.assertEqual('M\x90M', delta)
185
170
        delta = self.make_delta(_text3, _text2)
186
171
        self.assertDeltaIn(
187
 
            b'N\x90/\x1fdiffer from\nagainst other text\n',
188
 
            b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
172
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
173
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
189
174
            delta)
190
175
 
191
176
    def test_make_delta_with_large_copies(self):
194
179
        big_text = _text3 * 1220
195
180
        delta = self.make_delta(big_text, big_text)
196
181
        self.assertDeltaIn(
197
 
            b'\xdc\x86\x0a'      # Encoding the length of the uncompressed text
198
 
            b'\x80'              # Copy 64kB, starting at byte 0
199
 
            b'\x84\x01'          # and another 64kB starting at 64kB
200
 
            b'\xb4\x02\x5c\x83',  # And the bit of tail.
 
182
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
183
            '\x80'              # Copy 64kB, starting at byte 0
 
184
            '\x84\x01'          # and another 64kB starting at 64kB
 
185
            '\xb4\x02\x5c\x83', # And the bit of tail.
201
186
            None,   # Both implementations should be identical
202
187
            delta)
203
188
 
204
189
    def test_apply_delta_is_typesafe(self):
205
 
        self.apply_delta(_text1, b'M\x90M')
206
 
        self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
 
190
        self.apply_delta(_text1, 'M\x90M')
 
191
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
207
192
        self.assertRaises(TypeError, self.apply_delta,
208
 
                          _text1.decode('latin1'), b'M\x90M')
 
193
                          unicode(_text1), 'M\x90M')
209
194
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
210
195
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
211
196
 
212
197
    def test_apply_delta(self):
213
198
        target = self.apply_delta(_text1,
214
 
                                  b'N\x90/\x1fdiffer from\nagainst other text\n')
 
199
                    'N\x90/\x1fdiffer from\nagainst other text\n')
215
200
        self.assertEqual(_text2, target)
216
201
        target = self.apply_delta(_text2,
217
 
                                  b'M\x90/\x1ebe matched\nagainst other text\n')
 
202
                    'M\x90/\x1ebe matched\nagainst other text\n')
218
203
        self.assertEqual(_text1, target)
219
204
 
220
205
    def test_apply_delta_to_source_is_safe(self):
221
206
        self.assertRaises(TypeError,
222
 
                          self.apply_delta_to_source, object(), 0, 1)
 
207
            self.apply_delta_to_source, object(), 0, 1)
223
208
        self.assertRaises(TypeError,
224
 
                          self.apply_delta_to_source, u'unicode str', 0, 1)
 
209
            self.apply_delta_to_source, u'unicode str', 0, 1)
225
210
        # end > length
226
211
        self.assertRaises(ValueError,
227
 
                          self.apply_delta_to_source, b'foo', 1, 4)
 
212
            self.apply_delta_to_source, 'foo', 1, 4)
228
213
        # start > length
229
214
        self.assertRaises(ValueError,
230
 
                          self.apply_delta_to_source, b'foo', 5, 3)
 
215
            self.apply_delta_to_source, 'foo', 5, 3)
231
216
        # start > end
232
217
        self.assertRaises(ValueError,
233
 
                          self.apply_delta_to_source, b'foo', 3, 2)
 
218
            self.apply_delta_to_source, 'foo', 3, 2)
234
219
 
235
220
    def test_apply_delta_to_source(self):
236
221
        source_and_delta = (_text1
237
 
                            + b'N\x90/\x1fdiffer from\nagainst other text\n')
 
222
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
238
223
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
239
 
                                                            len(_text1), len(source_and_delta)))
 
224
                                    len(_text1), len(source_and_delta)))
240
225
 
241
226
 
242
227
class TestMakeAndApplyCompatible(tests.TestCase):
243
228
 
244
 
    scenarios = two_way_scenarios()
245
 
 
246
 
    make_delta = None  # Set by load_tests
247
 
    apply_delta = None  # Set by load_tests
 
229
    make_delta = None # Set by load_tests
 
230
    apply_delta = None # Set by load_tests
248
231
 
249
232
    def assertMakeAndApply(self, source, target):
250
233
        """Assert that generating a delta and applying gives success."""
272
255
        self._gc_module = compiled_groupcompress_feature.module
273
256
 
274
257
    def test_repr(self):
275
 
        di = self._gc_module.DeltaIndex(b'test text\n')
 
258
        di = self._gc_module.DeltaIndex('test text\n')
276
259
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
277
260
 
278
 
    def test_sizeof(self):
279
 
        di = self._gc_module.DeltaIndex()
280
 
        # Exact value will depend on platform but should include sources
281
 
        # source_info is a pointer and two longs so at least 12 bytes
282
 
        lower_bound = di._max_num_sources * 12
283
 
        self.assertGreater(sys.getsizeof(di), lower_bound)
284
 
 
285
 
    def test__dump_no_index(self):
286
 
        di = self._gc_module.DeltaIndex()
287
 
        self.assertEqual(None, di._dump_index())
288
 
 
289
 
    def test__dump_index_simple(self):
290
 
        di = self._gc_module.DeltaIndex()
291
 
        di.add_source(_text1, 0)
292
 
        self.assertFalse(di._has_index())
293
 
        self.assertEqual(None, di._dump_index())
294
 
        _ = di.make_delta(_text1)
295
 
        self.assertTrue(di._has_index())
296
 
        hash_list, entry_list = di._dump_index()
297
 
        self.assertEqual(16, len(hash_list))
298
 
        self.assertEqual(68, len(entry_list))
299
 
        just_entries = [(idx, text_offset, hash_val)
300
 
                        for idx, (text_offset, hash_val)
301
 
                        in enumerate(entry_list)
302
 
                        if text_offset != 0 or hash_val != 0]
303
 
        rabin_hash = self._gc_module._rabin_hash
304
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
305
 
                          (25, 48, rabin_hash(_text1[33:49])),
306
 
                          (34, 32, rabin_hash(_text1[17:33])),
307
 
                          (47, 64, rabin_hash(_text1[49:65])),
308
 
                          ], just_entries)
309
 
        # This ensures that the hash map points to the location we expect it to
310
 
        for entry_idx, text_offset, hash_val in just_entries:
311
 
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
312
 
 
313
 
    def test__dump_index_two_sources(self):
314
 
        di = self._gc_module.DeltaIndex()
315
 
        di.add_source(_text1, 0)
316
 
        di.add_source(_text2, 2)
317
 
        start2 = len(_text1) + 2
318
 
        self.assertTrue(di._has_index())
319
 
        hash_list, entry_list = di._dump_index()
320
 
        self.assertEqual(16, len(hash_list))
321
 
        self.assertEqual(68, len(entry_list))
322
 
        just_entries = [(idx, text_offset, hash_val)
323
 
                        for idx, (text_offset, hash_val)
324
 
                        in enumerate(entry_list)
325
 
                        if text_offset != 0 or hash_val != 0]
326
 
        rabin_hash = self._gc_module._rabin_hash
327
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
328
 
                          (9, start2 + 16, rabin_hash(_text2[1:17])),
329
 
                          (25, 48, rabin_hash(_text1[33:49])),
330
 
                          (30, start2 + 64, rabin_hash(_text2[49:65])),
331
 
                          (34, 32, rabin_hash(_text1[17:33])),
332
 
                          (35, start2 + 32, rabin_hash(_text2[17:33])),
333
 
                          (43, start2 + 48, rabin_hash(_text2[33:49])),
334
 
                          (47, 64, rabin_hash(_text1[49:65])),
335
 
                          ], just_entries)
336
 
        # Each entry should be in the appropriate hash bucket.
337
 
        for entry_idx, text_offset, hash_val in just_entries:
338
 
            hash_idx = hash_val & 0xf
339
 
            self.assertTrue(
340
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx + 1])
341
 
 
342
261
    def test_first_add_source_doesnt_index_until_make_delta(self):
343
262
        di = self._gc_module.DeltaIndex()
344
263
        self.assertFalse(di._has_index())
348
267
        # generated, and will generate a proper delta
349
268
        delta = di.make_delta(_text2)
350
269
        self.assertTrue(di._has_index())
351
 
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
352
 
 
353
 
    def test_add_source_max_bytes_to_index(self):
354
 
        di = self._gc_module.DeltaIndex()
355
 
        di._max_bytes_to_index = 3 * 16
356
 
        di.add_source(_text1, 0)  # (77 bytes -1) // 3 = 25 byte stride
357
 
        di.add_source(_text3, 3)  # (135 bytes -1) // 3 = 44 byte stride
358
 
        start2 = len(_text1) + 3
359
 
        hash_list, entry_list = di._dump_index()
360
 
        self.assertEqual(16, len(hash_list))
361
 
        self.assertEqual(67, len(entry_list))
362
 
        just_entries = sorted([(text_offset, hash_val)
363
 
                               for text_offset, hash_val in entry_list
364
 
                               if text_offset != 0 or hash_val != 0])
365
 
        rabin_hash = self._gc_module._rabin_hash
366
 
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
367
 
                          (50, rabin_hash(_text1[35:51])),
368
 
                          (75, rabin_hash(_text1[60:76])),
369
 
                          (start2 + 44, rabin_hash(_text3[29:45])),
370
 
                          (start2 + 88, rabin_hash(_text3[73:89])),
371
 
                          (start2 + 132, rabin_hash(_text3[117:133])),
372
 
                          ], just_entries)
 
270
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
373
271
 
374
272
    def test_second_add_source_triggers_make_index(self):
375
273
        di = self._gc_module.DeltaIndex()
382
280
    def test_make_delta(self):
383
281
        di = self._gc_module.DeltaIndex(_text1)
384
282
        delta = di.make_delta(_text2)
385
 
        self.assertEqual(b'N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
283
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
386
284
 
387
285
    def test_delta_against_multiple_sources(self):
388
286
        di = self._gc_module.DeltaIndex()
394
292
        delta = di.make_delta(_third_text)
395
293
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
396
294
        self.assertEqualDiff(_third_text, result)
397
 
        self.assertEqual(b'\x85\x01\x90\x14\x0chas some in '
398
 
                         b'\x91v6\x03and\x91d"\x91:\n', delta)
 
295
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
296
                         '\x91v6\x03and\x91d"\x91:\n', delta)
399
297
 
400
298
    def test_delta_with_offsets(self):
401
299
        di = self._gc_module.DeltaIndex()
407
305
        delta = di.make_delta(_third_text)
408
306
        self.assertIsNot(None, delta)
409
307
        result = self._gc_module.apply_delta(
410
 
            b'12345' + _first_text + b'1234567890' + _second_text, delta)
 
308
            '12345' + _first_text + '1234567890' + _second_text, delta)
411
309
        self.assertIsNot(None, result)
412
310
        self.assertEqualDiff(_third_text, result)
413
 
        self.assertEqual(b'\x85\x01\x91\x05\x14\x0chas some in '
414
 
                         b'\x91\x856\x03and\x91s"\x91?\n', delta)
 
311
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
312
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
415
313
 
416
314
    def test_delta_with_delta_bytes(self):
417
315
        di = self._gc_module.DeltaIndex()
419
317
        di.add_source(_first_text, 0)
420
318
        self.assertEqual(len(_first_text), di._source_offset)
421
319
        delta = di.make_delta(_second_text)
422
 
        self.assertEqual(b'h\tsome more\x91\x019'
423
 
                         b'&previous text\nand has some extra text\n', delta)
 
320
        self.assertEqual('h\tsome more\x91\x019'
 
321
                         '&previous text\nand has some extra text\n', delta)
424
322
        di.add_delta_source(delta, 0)
425
323
        source += delta
426
324
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
432
330
        # Note that we don't match the 'common with the', because it isn't long
433
331
        # enough to match in the original text, and those bytes are not present
434
332
        # in the delta for the second text.
435
 
        self.assertEqual(b'\x85\x01\x90\x14\x1chas some in common with the '
436
 
                         b'\x91S&\x03and\x91\x18,', second_delta)
 
333
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
334
                         '\x91S&\x03and\x91\x18,', second_delta)
437
335
        # Add this delta, and create a new delta for the same text. We should
438
336
        # find the remaining text, and only insert the short 'and' text.
439
337
        di.add_delta_source(second_delta, 0)
441
339
        third_delta = di.make_delta(_third_text)
442
340
        result = self._gc_module.apply_delta(source, third_delta)
443
341
        self.assertEqualDiff(_third_text, result)
444
 
        self.assertEqual(b'\x85\x01\x90\x14\x91\x7e\x1c'
445
 
                         b'\x91S&\x03and\x91\x18,', third_delta)
 
342
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
343
                         '\x91S&\x03and\x91\x18,', third_delta)
446
344
        # Now create a delta, which we know won't be able to be 'fit' into the
447
345
        # existing index
448
346
        fourth_delta = di.make_delta(_fourth_text)
449
347
        self.assertEqual(_fourth_text,
450
348
                         self._gc_module.apply_delta(source, fourth_delta))
451
 
        self.assertEqual(b'\x80\x01'
452
 
                         b'\x7f123456789012345\nsame rabin hash\n'
453
 
                         b'123456789012345\nsame rabin hash\n'
454
 
                         b'123456789012345\nsame rabin hash\n'
455
 
                         b'123456789012345\nsame rabin hash'
456
 
                         b'\x01\n', fourth_delta)
 
349
        self.assertEqual('\x80\x01'
 
350
                         '\x7f123456789012345\nsame rabin hash\n'
 
351
                         '123456789012345\nsame rabin hash\n'
 
352
                         '123456789012345\nsame rabin hash\n'
 
353
                         '123456789012345\nsame rabin hash'
 
354
                         '\x01\n', fourth_delta)
457
355
        di.add_delta_source(fourth_delta, 0)
458
356
        source += fourth_delta
459
357
        # With the next delta, everything should be found
460
358
        fifth_delta = di.make_delta(_fourth_text)
461
359
        self.assertEqual(_fourth_text,
462
360
                         self._gc_module.apply_delta(source, fifth_delta))
463
 
        self.assertEqual(b'\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
361
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
464
362
 
465
363
 
466
364
class TestCopyInstruction(tests.TestCase):
467
365
 
468
366
    def assertEncode(self, expected, offset, length):
469
 
        data = _groupcompress_py.encode_copy_instruction(offset, length)
470
 
        self.assertEqual(expected, data)
 
367
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
 
368
        if expected != bytes:
 
369
            self.assertEqual([hex(ord(e)) for e in expected],
 
370
                             [hex(ord(b)) for b in bytes])
471
371
 
472
 
    def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
473
 
        cmd = indexbytes(data, pos)
 
372
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
 
373
        cmd = ord(bytes[pos])
474
374
        pos += 1
475
 
        out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
 
375
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
476
376
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
477
377
 
478
378
    def test_encode_no_length(self):
479
 
        self.assertEncode(b'\x80', 0, 64 * 1024)
480
 
        self.assertEncode(b'\x81\x01', 1, 64 * 1024)
481
 
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
482
 
        self.assertEncode(b'\x81\xff', 255, 64 * 1024)
483
 
        self.assertEncode(b'\x82\x01', 256, 64 * 1024)
484
 
        self.assertEncode(b'\x83\x01\x01', 257, 64 * 1024)
485
 
        self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64 * 1024)
486
 
        self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64 * 1024)
487
 
        self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64 * 1024)
488
 
        self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64 * 1024)
489
 
        self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64 * 1024)
490
 
        self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64 * 1024)
 
379
        self.assertEncode('\x80', 0, 64*1024)
 
380
        self.assertEncode('\x81\x01', 1, 64*1024)
 
381
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
382
        self.assertEncode('\x81\xff', 255, 64*1024)
 
383
        self.assertEncode('\x82\x01', 256, 64*1024)
 
384
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
 
385
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
386
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
387
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
388
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
389
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
390
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
491
391
 
492
392
    def test_encode_no_offset(self):
493
 
        self.assertEncode(b'\x90\x01', 0, 1)
494
 
        self.assertEncode(b'\x90\x0a', 0, 10)
495
 
        self.assertEncode(b'\x90\xff', 0, 255)
496
 
        self.assertEncode(b'\xA0\x01', 0, 256)
497
 
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
498
 
        self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
 
393
        self.assertEncode('\x90\x01', 0, 1)
 
394
        self.assertEncode('\x90\x0a', 0, 10)
 
395
        self.assertEncode('\x90\xff', 0, 255)
 
396
        self.assertEncode('\xA0\x01', 0, 256)
 
397
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
398
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
499
399
        # Special case, if copy == 64KiB, then we store exactly 0
500
400
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
501
401
        # about that, as we would never actually copy 0 bytes
502
 
        self.assertEncode(b'\x80', 0, 64 * 1024)
 
402
        self.assertEncode('\x80', 0, 64*1024)
503
403
 
504
404
    def test_encode(self):
505
 
        self.assertEncode(b'\x91\x01\x01', 1, 1)
506
 
        self.assertEncode(b'\x91\x09\x0a', 9, 10)
507
 
        self.assertEncode(b'\x91\xfe\xff', 254, 255)
508
 
        self.assertEncode(b'\xA2\x02\x01', 512, 256)
509
 
        self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
510
 
        self.assertEncode(b'\xB0\x01\x01', 0, 257)
 
405
        self.assertEncode('\x91\x01\x01', 1, 1)
 
406
        self.assertEncode('\x91\x09\x0a', 9, 10)
 
407
        self.assertEncode('\x91\xfe\xff', 254, 255)
 
408
        self.assertEncode('\xA2\x02\x01', 512, 256)
 
409
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
 
410
        self.assertEncode('\xB0\x01\x01', 0, 257)
511
411
        # Special case, if copy == 64KiB, then we store exactly 0
512
412
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
513
413
        # about that, as we would never actually copy 0 bytes
514
 
        self.assertEncode(b'\x81\x0a', 10, 64 * 1024)
 
414
        self.assertEncode('\x81\x0a', 10, 64*1024)
515
415
 
516
416
    def test_decode_no_length(self):
517
417
        # If length is 0, it is interpreted as 64KiB
518
418
        # The shortest possible instruction is a copy of 64KiB from offset 0
519
 
        self.assertDecode(0, 65536, 1, b'\x80', 0)
520
 
        self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
521
 
        self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
522
 
        self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
523
 
        self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
524
 
        self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
525
 
        self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
526
 
        self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
527
 
        self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
528
 
        self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
529
 
        self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
530
 
        self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
 
419
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
420
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
 
421
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
 
422
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
 
423
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
 
424
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
 
425
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
 
426
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
 
427
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
 
428
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
 
429
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
 
430
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
531
431
 
532
432
    def test_decode_no_offset(self):
533
 
        self.assertDecode(0, 1, 2, b'\x90\x01', 0)
534
 
        self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
535
 
        self.assertDecode(0, 255, 2, b'\x90\xff', 0)
536
 
        self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
537
 
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
538
 
        self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
 
433
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
 
434
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
 
435
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
 
436
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
 
437
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
438
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
539
439
        # Special case, if copy == 64KiB, then we store exactly 0
540
440
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
541
441
        # about that, as we would never actually copy 0 bytes
542
 
        self.assertDecode(0, 65536, 1, b'\x80', 0)
 
442
        self.assertDecode(0, 65536, 1, '\x80', 0)
543
443
 
544
444
    def test_decode(self):
545
 
        self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
546
 
        self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
547
 
        self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
548
 
        self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
549
 
        self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
550
 
        self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
 
445
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
 
446
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
 
447
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
 
448
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
 
449
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
 
450
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
551
451
 
552
452
    def test_decode_not_start(self):
553
 
        self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
554
 
        self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
555
 
        self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
 
453
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
 
454
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
 
455
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
556
456
 
557
457
 
558
458
class TestBase128Int(tests.TestCase):
559
459
 
560
 
    scenarios = module_scenarios()
561
 
 
562
 
    _gc_module = None  # Set by load_tests
 
460
    _gc_module = None # Set by load_tests
563
461
 
564
462
    def assertEqualEncode(self, bytes, val):
565
463
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
569
467
                         self._gc_module.decode_base128_int(bytes))
570
468
 
571
469
    def test_encode(self):
572
 
        self.assertEqualEncode(b'\x01', 1)
573
 
        self.assertEqualEncode(b'\x02', 2)
574
 
        self.assertEqualEncode(b'\x7f', 127)
575
 
        self.assertEqualEncode(b'\x80\x01', 128)
576
 
        self.assertEqualEncode(b'\xff\x01', 255)
577
 
        self.assertEqualEncode(b'\x80\x02', 256)
578
 
        self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
470
        self.assertEqualEncode('\x01', 1)
 
471
        self.assertEqualEncode('\x02', 2)
 
472
        self.assertEqualEncode('\x7f', 127)
 
473
        self.assertEqualEncode('\x80\x01', 128)
 
474
        self.assertEqualEncode('\xff\x01', 255)
 
475
        self.assertEqualEncode('\x80\x02', 256)
 
476
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
579
477
 
580
478
    def test_decode(self):
581
 
        self.assertEqualDecode(1, 1, b'\x01')
582
 
        self.assertEqualDecode(2, 1, b'\x02')
583
 
        self.assertEqualDecode(127, 1, b'\x7f')
584
 
        self.assertEqualDecode(128, 2, b'\x80\x01')
585
 
        self.assertEqualDecode(255, 2, b'\xff\x01')
586
 
        self.assertEqualDecode(256, 2, b'\x80\x02')
587
 
        self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
 
479
        self.assertEqualDecode(1, 1, '\x01')
 
480
        self.assertEqualDecode(2, 1, '\x02')
 
481
        self.assertEqualDecode(127, 1, '\x7f')
 
482
        self.assertEqualDecode(128, 2, '\x80\x01')
 
483
        self.assertEqualDecode(255, 2, '\xff\x01')
 
484
        self.assertEqualDecode(256, 2, '\x80\x02')
 
485
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
588
486
 
589
487
    def test_decode_with_trailing_bytes(self):
590
 
        self.assertEqualDecode(1, 1, b'\x01abcdef')
591
 
        self.assertEqualDecode(127, 1, b'\x7f\x01')
592
 
        self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
593
 
        self.assertEqualDecode(255, 2, b'\xff\x01\xff')
 
488
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
489
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
490
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
491
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
492
 
 
493