/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
4241.6.6 by Robert Collins, John Arbash Meinel, Ian Clathworthy, Vincent Ladeuil
Groupcompress from brisbane-core.
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
19
from bzrlib import (
20
    groupcompress,
21
    _groupcompress_py,
22
    tests,
23
    )
24
25
26
def load_tests(standard_tests, module, loader):
27
    """Parameterize tests for all versions of groupcompress."""
28
    two_way_scenarios = [
29
        ('PP', {'make_delta': _groupcompress_py.make_delta,
30
                'apply_delta': _groupcompress_py.apply_delta})
31
        ]
32
    scenarios = [
33
        ('python', {'_gc_module': _groupcompress_py}),
34
        ]
35
    if CompiledGroupCompressFeature.available():
36
        from bzrlib import _groupcompress_pyx
37
        scenarios.append(('C',
38
            {'_gc_module': _groupcompress_pyx}))
39
        two_way_scenarios.extend([
40
            ('CC', {'make_delta': _groupcompress_pyx.make_delta,
41
                    'apply_delta': _groupcompress_pyx.apply_delta}),
42
            ('PC', {'make_delta': _groupcompress_py.make_delta,
43
                    'apply_delta': _groupcompress_pyx.apply_delta}),
44
            ('CP', {'make_delta': _groupcompress_pyx.make_delta,
45
                    'apply_delta': _groupcompress_py.apply_delta}),
46
            ])
47
    to_adapt, result = tests.split_suite_by_condition(
48
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
49
                                                    TestBase128Int)))
50
    result = tests.multiply_tests(to_adapt, scenarios, result)
51
    to_adapt, result = tests.split_suite_by_condition(result,
52
        tests.condition_isinstance(TestMakeAndApplyCompatible))
53
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
54
    return result
55
56
57
class _CompiledGroupCompressFeature(tests.Feature):
58
59
    def _probe(self):
60
        try:
61
            import bzrlib._groupcompress_pyx
62
        except ImportError:
63
            return False
64
        else:
65
            return True
66
67
    def feature_name(self):
68
        return 'bzrlib._groupcompress_pyx'
69
70
71
CompiledGroupCompressFeature = _CompiledGroupCompressFeature()
72
73
_text1 = """\
74
This is a bit
75
of source text
76
which is meant to be matched
77
against other text
78
"""
79
80
_text2 = """\
81
This is a bit
82
of source text
83
which is meant to differ from
84
against other text
85
"""
86
87
_text3 = """\
88
This is a bit
89
of source text
90
which is meant to be matched
91
against other text
92
except it also
93
has a lot more data
94
at the end of the file
95
"""
96
97
_first_text = """\
98
a bit of text, that
99
does not have much in
100
common with the next text
101
"""
102
103
_second_text = """\
104
some more bit of text, that
105
does not have much in
106
common with the previous text
107
and has some extra text
108
"""
109
110
111
_third_text = """\
112
a bit of text, that
113
has some in common with the previous text
114
and has some extra text
115
and not have much in
116
common with the next text
117
"""
118
119
_fourth_text = """\
120
123456789012345
121
same rabin hash
122
123456789012345
123
same rabin hash
124
123456789012345
125
same rabin hash
126
123456789012345
127
same rabin hash
128
"""
129
130
class TestMakeAndApplyDelta(tests.TestCase):
131
132
    _gc_module = None # Set by load_tests
133
134
    def setUp(self):
135
        super(TestMakeAndApplyDelta, self).setUp()
136
        self.make_delta = self._gc_module.make_delta
137
        self.apply_delta = self._gc_module.apply_delta
138
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
139
140
    def test_make_delta_is_typesafe(self):
141
        self.make_delta('a string', 'another string')
142
143
        def _check_make_delta(string1, string2):
144
            self.assertRaises(TypeError, self.make_delta, string1, string2)
145
146
        _check_make_delta('a string', object())
147
        _check_make_delta('a string', u'not a string')
148
        _check_make_delta(object(), 'a string')
149
        _check_make_delta(u'not a string', 'a string')
150
151
    def test_make_noop_delta(self):
152
        ident_delta = self.make_delta(_text1, _text1)
153
        self.assertEqual('M\x90M', ident_delta)
154
        ident_delta = self.make_delta(_text2, _text2)
155
        self.assertEqual('N\x90N', ident_delta)
156
        ident_delta = self.make_delta(_text3, _text3)
157
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
158
159
    def assertDeltaIn(self, delta1, delta2, delta):
160
        """Make sure that the delta bytes match one of the expectations."""
161
        # In general, the python delta matcher gives different results than the
162
        # pyrex delta matcher. Both should be valid deltas, though.
163
        if delta not in (delta1, delta2):
164
            self.fail("Delta bytes:\n"
165
                      "       %r\n"
166
                      "not in %r\n"
167
                      "    or %r"
168
                      % (delta, delta1, delta2))
169
170
    def test_make_delta(self):
171
        delta = self.make_delta(_text1, _text2)
172
        self.assertDeltaIn(
173
            'N\x90/\x1fdiffer from\nagainst other text\n',
174
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
175
            delta)
176
        delta = self.make_delta(_text2, _text1)
177
        self.assertDeltaIn(
178
            'M\x90/\x1ebe matched\nagainst other text\n',
179
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
180
            delta)
181
        delta = self.make_delta(_text3, _text1)
182
        self.assertEqual('M\x90M', delta)
183
        delta = self.make_delta(_text3, _text2)
184
        self.assertDeltaIn(
185
            'N\x90/\x1fdiffer from\nagainst other text\n',
186
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
187
            delta)
188
189
    def test_apply_delta_is_typesafe(self):
190
        self.apply_delta(_text1, 'M\x90M')
191
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
192
        self.assertRaises(TypeError, self.apply_delta,
193
                          unicode(_text1), 'M\x90M')
194
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
195
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
196
197
    def test_apply_delta(self):
198
        target = self.apply_delta(_text1,
199
                    'N\x90/\x1fdiffer from\nagainst other text\n')
200
        self.assertEqual(_text2, target)
201
        target = self.apply_delta(_text2,
202
                    'M\x90/\x1ebe matched\nagainst other text\n')
203
        self.assertEqual(_text1, target)
204
205
    def test_apply_delta_to_source_is_safe(self):
206
        self.assertRaises(TypeError,
207
            self.apply_delta_to_source, object(), 0, 1)
208
        self.assertRaises(TypeError,
209
            self.apply_delta_to_source, u'unicode str', 0, 1)
210
        # end > length
211
        self.assertRaises(ValueError,
212
            self.apply_delta_to_source, 'foo', 1, 4)
213
        # start > length
214
        self.assertRaises(ValueError,
215
            self.apply_delta_to_source, 'foo', 5, 3)
216
        # start > end
217
        self.assertRaises(ValueError,
218
            self.apply_delta_to_source, 'foo', 3, 2)
219
220
    def test_apply_delta_to_source(self):
221
        source_and_delta = (_text1
222
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
223
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
224
                                    len(_text1), len(source_and_delta)))
225
226
227
class TestMakeAndApplyCompatible(tests.TestCase):
228
229
    make_delta = None # Set by load_tests
230
    apply_delta = None # Set by load_tests
231
232
    def assertMakeAndApply(self, source, target):
233
        """Assert that generating a delta and applying gives success."""
234
        delta = self.make_delta(source, target)
235
        bytes = self.apply_delta(source, delta)
236
        self.assertEqualDiff(target, bytes)
237
238
    def test_direct(self):
239
        self.assertMakeAndApply(_text1, _text2)
240
        self.assertMakeAndApply(_text2, _text1)
241
        self.assertMakeAndApply(_text1, _text3)
242
        self.assertMakeAndApply(_text3, _text1)
243
        self.assertMakeAndApply(_text2, _text3)
244
        self.assertMakeAndApply(_text3, _text2)
245
246
247
class TestDeltaIndex(tests.TestCase):
248
249
    def setUp(self):
250
        super(TestDeltaIndex, self).setUp()
251
        # This test isn't multiplied, because we only have DeltaIndex for the
252
        # compiled form
253
        # We call this here, because _test_needs_features happens after setUp
254
        self.requireFeature(CompiledGroupCompressFeature)
255
        from bzrlib import _groupcompress_pyx
256
        self._gc_module = _groupcompress_pyx
257
258
    def test_repr(self):
259
        di = self._gc_module.DeltaIndex('test text\n')
260
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
261
262
    def test_make_delta(self):
263
        di = self._gc_module.DeltaIndex(_text1)
264
        delta = di.make_delta(_text2)
265
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
266
267
    def test_delta_against_multiple_sources(self):
268
        di = self._gc_module.DeltaIndex()
269
        di.add_source(_first_text, 0)
270
        self.assertEqual(len(_first_text), di._source_offset)
271
        di.add_source(_second_text, 0)
272
        self.assertEqual(len(_first_text) + len(_second_text),
273
                         di._source_offset)
274
        delta = di.make_delta(_third_text)
275
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
276
        self.assertEqualDiff(_third_text, result)
277
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
278
                         '\x91v6\x03and\x91d"\x91:\n', delta)
279
280
    def test_delta_with_offsets(self):
281
        di = self._gc_module.DeltaIndex()
282
        di.add_source(_first_text, 5)
283
        self.assertEqual(len(_first_text) + 5, di._source_offset)
284
        di.add_source(_second_text, 10)
285
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
286
                         di._source_offset)
287
        delta = di.make_delta(_third_text)
288
        self.assertIsNot(None, delta)
289
        result = self._gc_module.apply_delta(
290
            '12345' + _first_text + '1234567890' + _second_text, delta)
291
        self.assertIsNot(None, result)
292
        self.assertEqualDiff(_third_text, result)
293
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
294
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
295
296
    def test_delta_with_delta_bytes(self):
297
        di = self._gc_module.DeltaIndex()
298
        source = _first_text
299
        di.add_source(_first_text, 0)
300
        self.assertEqual(len(_first_text), di._source_offset)
301
        delta = di.make_delta(_second_text)
302
        self.assertEqual('h\tsome more\x91\x019'
303
                         '&previous text\nand has some extra text\n', delta)
304
        di.add_delta_source(delta, 0)
305
        source += delta
306
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
307
        second_delta = di.make_delta(_third_text)
308
        result = self._gc_module.apply_delta(source, second_delta)
309
        self.assertEqualDiff(_third_text, result)
310
        # We should be able to match against the
311
        # 'previous text\nand has some...'  that was part of the delta bytes
312
        # Note that we don't match the 'common with the', because it isn't long
313
        # enough to match in the original text, and those bytes are not present
314
        # in the delta for the second text.
315
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
316
                         '\x91S&\x03and\x91\x18,', second_delta)
317
        # Add this delta, and create a new delta for the same text. We should
318
        # find the remaining text, and only insert the short 'and' text.
319
        di.add_delta_source(second_delta, 0)
320
        source += second_delta
321
        third_delta = di.make_delta(_third_text)
322
        result = self._gc_module.apply_delta(source, third_delta)
323
        self.assertEqualDiff(_third_text, result)
324
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
325
                         '\x91S&\x03and\x91\x18,', third_delta)
326
        # Now create a delta, which we know won't be able to be 'fit' into the
327
        # existing index
328
        fourth_delta = di.make_delta(_fourth_text)
329
        self.assertEqual(_fourth_text,
330
                         self._gc_module.apply_delta(source, fourth_delta))
331
        self.assertEqual('\x80\x01'
332
                         '\x7f123456789012345\nsame rabin hash\n'
333
                         '123456789012345\nsame rabin hash\n'
334
                         '123456789012345\nsame rabin hash\n'
335
                         '123456789012345\nsame rabin hash'
336
                         '\x01\n', fourth_delta)
337
        di.add_delta_source(fourth_delta, 0)
338
        source += fourth_delta
339
        # With the next delta, everything should be found
340
        fifth_delta = di.make_delta(_fourth_text)
341
        self.assertEqual(_fourth_text,
342
                         self._gc_module.apply_delta(source, fifth_delta))
343
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
344
345
346
class TestCopyInstruction(tests.TestCase):
347
348
    def assertEncode(self, expected, offset, length):
349
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
350
        if expected != bytes:
351
            self.assertEqual([hex(ord(e)) for e in expected],
352
                             [hex(ord(b)) for b in bytes])
353
354
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
355
        cmd = ord(bytes[pos])
356
        pos += 1
357
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
358
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
359
360
    def test_encode_no_length(self):
361
        self.assertEncode('\x80', 0, None)
362
        self.assertEncode('\x81\x01', 1, None)
363
        self.assertEncode('\x81\x0a', 10, None)
364
        self.assertEncode('\x81\xff', 255, None)
365
        self.assertEncode('\x82\x01', 256, None)
366
        self.assertEncode('\x83\x01\x01', 257, None)
367
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, None)
368
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, None)
369
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, None)
370
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, None)
371
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, None)
372
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, None)
373
374
    def test_encode_no_offset(self):
375
        self.assertEncode('\x90\x01', 0, 1)
376
        self.assertEncode('\x90\x0a', 0, 10)
377
        self.assertEncode('\x90\xff', 0, 255)
378
        self.assertEncode('\xA0\x01', 0, 256)
379
        self.assertEncode('\xB0\x01\x01', 0, 257)
380
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
381
        # Special case, if copy == 64KiB, then we store exactly 0
382
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
383
        # about that, as we would never actually copy 0 bytes
384
        self.assertEncode('\x80', 0, 64*1024)
385
386
    def test_encode(self):
387
        self.assertEncode('\x91\x01\x01', 1, 1)
388
        self.assertEncode('\x91\x09\x0a', 9, 10)
389
        self.assertEncode('\x91\xfe\xff', 254, 255)
390
        self.assertEncode('\xA2\x02\x01', 512, 256)
391
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
392
        self.assertEncode('\xB0\x01\x01', 0, 257)
393
        # Special case, if copy == 64KiB, then we store exactly 0
394
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
395
        # about that, as we would never actually copy 0 bytes
396
        self.assertEncode('\x81\x0a', 10, 64*1024)
397
398
    def test_decode_no_length(self):
399
        # If length is 0, it is interpreted as 64KiB
400
        # The shortest possible instruction is a copy of 64KiB from offset 0
401
        self.assertDecode(0, 65536, 1, '\x80', 0)
402
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
403
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
404
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
405
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
406
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
407
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
408
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
409
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
410
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
411
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
412
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
413
414
    def test_decode_no_offset(self):
415
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
416
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
417
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
418
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
419
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
420
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
421
        # Special case, if copy == 64KiB, then we store exactly 0
422
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
423
        # about that, as we would never actually copy 0 bytes
424
        self.assertDecode(0, 65536, 1, '\x80', 0)
425
426
    def test_decode(self):
427
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
428
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
429
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
430
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
431
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
432
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
433
434
    def test_decode_not_start(self):
435
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
436
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
437
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
438
439
440
class TestBase128Int(tests.TestCase):
441
442
    _gc_module = None # Set by load_tests
443
444
    def assertEqualEncode(self, bytes, val):
445
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
446
447
    def assertEqualDecode(self, val, num_decode, bytes):
448
        self.assertEqual((val, num_decode),
449
                         self._gc_module.decode_base128_int(bytes))
450
451
    def test_encode(self):
452
        self.assertEqualEncode('\x01', 1)
453
        self.assertEqualEncode('\x02', 2)
454
        self.assertEqualEncode('\x7f', 127)
455
        self.assertEqualEncode('\x80\x01', 128)
456
        self.assertEqualEncode('\xff\x01', 255)
457
        self.assertEqualEncode('\x80\x02', 256)
458
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
459
460
    def test_decode(self):
461
        self.assertEqualDecode(1, 1, '\x01')
462
        self.assertEqualDecode(2, 1, '\x02')
463
        self.assertEqualDecode(127, 1, '\x7f')
464
        self.assertEqualDecode(128, 2, '\x80\x01')
465
        self.assertEqualDecode(255, 2, '\xff\x01')
466
        self.assertEqualDecode(256, 2, '\x80\x02')
467
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
468
469
    def test_decode_with_trailing_bytes(self):
470
        self.assertEqualDecode(1, 1, '\x01abcdef')
471
        self.assertEqualDecode(127, 1, '\x7f\x01')
472
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
473
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
474
475