14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the pyrex extension of groupcompress"""
19
from bzrlib import tests
21
from bzrlib import groupcompress
24
class _CompiledGroupCompress(tests.Feature):
17
"""Tests for the python and pyrex extensions of groupcompress"""
26
def load_tests(standard_tests, module, loader):
27
"""Parameterize tests for all versions of groupcompress."""
29
('PP', {'make_delta': _groupcompress_py.make_delta,
30
'apply_delta': _groupcompress_py.apply_delta})
33
('python', {'_gc_module': _groupcompress_py}),
35
if CompiledGroupCompressFeature.available():
36
from bzrlib import _groupcompress_pyx
37
scenarios.append(('C',
38
{'_gc_module': _groupcompress_pyx}))
39
two_way_scenarios.extend([
40
('CC', {'make_delta': _groupcompress_pyx.make_delta,
41
'apply_delta': _groupcompress_pyx.apply_delta}),
42
('PC', {'make_delta': _groupcompress_py.make_delta,
43
'apply_delta': _groupcompress_pyx.apply_delta}),
44
('CP', {'make_delta': _groupcompress_pyx.make_delta,
45
'apply_delta': _groupcompress_py.apply_delta}),
47
to_adapt, result = tests.split_suite_by_condition(
48
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
50
result = tests.multiply_tests(to_adapt, scenarios, result)
51
to_adapt, result = tests.split_suite_by_condition(result,
52
tests.condition_isinstance(TestMakeAndApplyCompatible))
53
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
57
class _CompiledGroupCompressFeature(tests.Feature):
96
class Test_GroupCompress(tests.TestCase):
97
"""Direct tests for the compiled extension."""
100
super(Test_GroupCompress, self).setUp()
101
self.requireFeature(CompiledGroupCompress)
102
from bzrlib import _groupcompress_pyx
103
self._gc_module = _groupcompress_pyx
106
class TestMakeAndApplyDelta(Test_GroupCompress):
130
class TestMakeAndApplyDelta(tests.TestCase):
132
_gc_module = None # Set by load_tests
109
135
super(TestMakeAndApplyDelta, self).setUp()
110
136
self.make_delta = self._gc_module.make_delta
111
137
self.apply_delta = self._gc_module.apply_delta
138
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
113
140
def test_make_delta_is_typesafe(self):
114
141
self.make_delta('a string', 'another string')
115
self.assertRaises(TypeError,
116
self.make_delta, 'a string', object())
117
self.assertRaises(TypeError,
118
self.make_delta, 'a string', u'not a string')
119
self.assertRaises(TypeError,
120
self.make_delta, object(), 'a string')
121
self.assertRaises(TypeError,
122
self.make_delta, u'not a string', 'a string')
143
def _check_make_delta(string1, string2):
144
self.assertRaises(TypeError, self.make_delta, string1, string2)
146
_check_make_delta('a string', object())
147
_check_make_delta('a string', u'not a string')
148
_check_make_delta(object(), 'a string')
149
_check_make_delta(u'not a string', 'a string')
124
151
def test_make_noop_delta(self):
125
152
ident_delta = self.make_delta(_text1, _text1)
126
self.assertEqual('MM\x90M', ident_delta)
153
self.assertEqual('M\x90M', ident_delta)
127
154
ident_delta = self.make_delta(_text2, _text2)
128
self.assertEqual('NN\x90N', ident_delta)
155
self.assertEqual('N\x90N', ident_delta)
129
156
ident_delta = self.make_delta(_text3, _text3)
130
self.assertEqual('\x87\x01\x87\x01\x90\x87', ident_delta)
157
self.assertEqual('\x87\x01\x90\x87', ident_delta)
159
def assertDeltaIn(self, delta1, delta2, delta):
160
"""Make sure that the delta bytes match one of the expectations."""
161
# In general, the python delta matcher gives different results than the
162
# pyrex delta matcher. Both should be valid deltas, though.
163
if delta not in (delta1, delta2):
164
self.fail("Delta bytes:\n"
168
% (delta, delta1, delta2))
132
170
def test_make_delta(self):
133
171
delta = self.make_delta(_text1, _text2)
134
self.assertEqual('MN\x90/\x1fdiffer from\nagainst other text\n', delta)
173
'N\x90/\x1fdiffer from\nagainst other text\n',
174
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
135
176
delta = self.make_delta(_text2, _text1)
136
self.assertEqual('NM\x90/\x1ebe matched\nagainst other text\n', delta)
178
'M\x90/\x1ebe matched\nagainst other text\n',
179
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
137
181
delta = self.make_delta(_text3, _text1)
138
self.assertEqual('\x87\x01M\x90M', delta)
182
self.assertEqual('M\x90M', delta)
139
183
delta = self.make_delta(_text3, _text2)
140
self.assertEqual('\x87\x01N\x90/\x1fdiffer from\nagainst other text\n',
185
'N\x90/\x1fdiffer from\nagainst other text\n',
186
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
143
189
def test_apply_delta_is_typesafe(self):
144
self.apply_delta(_text1, 'MM\x90M')
145
self.assertRaises(TypeError,
146
self.apply_delta, object(), 'MM\x90M')
147
self.assertRaises(TypeError,
148
self.apply_delta, unicode(_text1), 'MM\x90M')
149
self.assertRaises(TypeError,
150
self.apply_delta, _text1, u'MM\x90M')
151
self.assertRaises(TypeError,
152
self.apply_delta, _text1, object())
190
self.apply_delta(_text1, 'M\x90M')
191
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
192
self.assertRaises(TypeError, self.apply_delta,
193
unicode(_text1), 'M\x90M')
194
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
195
self.assertRaises(TypeError, self.apply_delta, _text1, object())
154
197
def test_apply_delta(self):
155
198
target = self.apply_delta(_text1,
156
'MN\x90/\x1fdiffer from\nagainst other text\n')
199
'N\x90/\x1fdiffer from\nagainst other text\n')
157
200
self.assertEqual(_text2, target)
158
201
target = self.apply_delta(_text2,
159
'NM\x90/\x1ebe matched\nagainst other text\n')
202
'M\x90/\x1ebe matched\nagainst other text\n')
160
203
self.assertEqual(_text1, target)
163
class TestDeltaIndex(Test_GroupCompress):
205
def test_apply_delta_to_source_is_safe(self):
206
self.assertRaises(TypeError,
207
self.apply_delta_to_source, object(), 0, 1)
208
self.assertRaises(TypeError,
209
self.apply_delta_to_source, u'unicode str', 0, 1)
211
self.assertRaises(ValueError,
212
self.apply_delta_to_source, 'foo', 1, 4)
214
self.assertRaises(ValueError,
215
self.apply_delta_to_source, 'foo', 5, 3)
217
self.assertRaises(ValueError,
218
self.apply_delta_to_source, 'foo', 3, 2)
220
def test_apply_delta_to_source(self):
221
source_and_delta = (_text1
222
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
223
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
224
len(_text1), len(source_and_delta)))
227
class TestMakeAndApplyCompatible(tests.TestCase):
229
make_delta = None # Set by load_tests
230
apply_delta = None # Set by load_tests
232
def assertMakeAndApply(self, source, target):
233
"""Assert that generating a delta and applying gives success."""
234
delta = self.make_delta(source, target)
235
bytes = self.apply_delta(source, delta)
236
self.assertEqualDiff(target, bytes)
238
def test_direct(self):
239
self.assertMakeAndApply(_text1, _text2)
240
self.assertMakeAndApply(_text2, _text1)
241
self.assertMakeAndApply(_text1, _text3)
242
self.assertMakeAndApply(_text3, _text1)
243
self.assertMakeAndApply(_text2, _text3)
244
self.assertMakeAndApply(_text3, _text2)
247
class TestDeltaIndex(tests.TestCase):
250
super(TestDeltaIndex, self).setUp()
251
# This test isn't multiplied, because we only have DeltaIndex for the
253
# We call this here, because _test_needs_features happens after setUp
254
self.requireFeature(CompiledGroupCompressFeature)
255
from bzrlib import _groupcompress_pyx
256
self._gc_module = _groupcompress_pyx
165
258
def test_repr(self):
166
259
di = self._gc_module.DeltaIndex('test text\n')
246
340
fifth_delta = di.make_delta(_fourth_text)
247
341
self.assertEqual(_fourth_text,
248
342
self._gc_module.apply_delta(source, fifth_delta))
249
self.assertEqual('\xac\x02\x80\x01\x91\xab\x7f\x01\n', fifth_delta)
343
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
346
class TestCopyInstruction(tests.TestCase):
348
def assertEncode(self, expected, offset, length):
349
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
350
if expected != bytes:
351
self.assertEqual([hex(ord(e)) for e in expected],
352
[hex(ord(b)) for b in bytes])
354
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
355
cmd = ord(bytes[pos])
357
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
358
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
360
def test_encode_no_length(self):
361
self.assertEncode('\x80', 0, None)
362
self.assertEncode('\x81\x01', 1, None)
363
self.assertEncode('\x81\x0a', 10, None)
364
self.assertEncode('\x81\xff', 255, None)
365
self.assertEncode('\x82\x01', 256, None)
366
self.assertEncode('\x83\x01\x01', 257, None)
367
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, None)
368
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, None)
369
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, None)
370
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, None)
371
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, None)
372
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, None)
374
def test_encode_no_offset(self):
375
self.assertEncode('\x90\x01', 0, 1)
376
self.assertEncode('\x90\x0a', 0, 10)
377
self.assertEncode('\x90\xff', 0, 255)
378
self.assertEncode('\xA0\x01', 0, 256)
379
self.assertEncode('\xB0\x01\x01', 0, 257)
380
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
381
# Special case, if copy == 64KiB, then we store exactly 0
382
# Note that this puns with a copy of exactly 0 bytes, but we don't care
383
# about that, as we would never actually copy 0 bytes
384
self.assertEncode('\x80', 0, 64*1024)
386
def test_encode(self):
387
self.assertEncode('\x91\x01\x01', 1, 1)
388
self.assertEncode('\x91\x09\x0a', 9, 10)
389
self.assertEncode('\x91\xfe\xff', 254, 255)
390
self.assertEncode('\xA2\x02\x01', 512, 256)
391
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
392
self.assertEncode('\xB0\x01\x01', 0, 257)
393
# Special case, if copy == 64KiB, then we store exactly 0
394
# Note that this puns with a copy of exactly 0 bytes, but we don't care
395
# about that, as we would never actually copy 0 bytes
396
self.assertEncode('\x81\x0a', 10, 64*1024)
398
def test_decode_no_length(self):
399
# If length is 0, it is interpreted as 64KiB
400
# The shortest possible instruction is a copy of 64KiB from offset 0
401
self.assertDecode(0, 65536, 1, '\x80', 0)
402
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
403
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
404
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
405
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
406
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
407
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
408
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
409
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
410
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
411
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
412
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
414
def test_decode_no_offset(self):
415
self.assertDecode(0, 1, 2, '\x90\x01', 0)
416
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
417
self.assertDecode(0, 255, 2, '\x90\xff', 0)
418
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
419
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
420
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
421
# Special case, if copy == 64KiB, then we store exactly 0
422
# Note that this puns with a copy of exactly 0 bytes, but we don't care
423
# about that, as we would never actually copy 0 bytes
424
self.assertDecode(0, 65536, 1, '\x80', 0)
426
def test_decode(self):
427
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
428
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
429
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
430
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
431
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
432
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
434
def test_decode_not_start(self):
435
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
436
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
437
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
440
class TestBase128Int(tests.TestCase):
442
_gc_module = None # Set by load_tests
444
def assertEqualEncode(self, bytes, val):
445
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
447
def assertEqualDecode(self, val, num_decode, bytes):
448
self.assertEqual((val, num_decode),
449
self._gc_module.decode_base128_int(bytes))
451
def test_encode(self):
452
self.assertEqualEncode('\x01', 1)
453
self.assertEqualEncode('\x02', 2)
454
self.assertEqualEncode('\x7f', 127)
455
self.assertEqualEncode('\x80\x01', 128)
456
self.assertEqualEncode('\xff\x01', 255)
457
self.assertEqualEncode('\x80\x02', 256)
458
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
460
def test_decode(self):
461
self.assertEqualDecode(1, 1, '\x01')
462
self.assertEqualDecode(2, 1, '\x02')
463
self.assertEqualDecode(127, 1, '\x7f')
464
self.assertEqualDecode(128, 2, '\x80\x01')
465
self.assertEqualDecode(255, 2, '\xff\x01')
466
self.assertEqualDecode(256, 2, '\x80\x02')
467
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
469
def test_decode_with_trailing_bytes(self):
470
self.assertEqualDecode(1, 1, '\x01abcdef')
471
self.assertEqualDecode(127, 1, '\x7f\x01')
472
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
473
self.assertEqualDecode(255, 2, '\xff\x01\xff')