1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
26
def load_tests(standard_tests, module, loader):
27
"""Parameterize tests for all versions of groupcompress."""
29
('PP', {'make_delta': _groupcompress_py.make_delta,
30
'apply_delta': _groupcompress_py.apply_delta})
33
('python', {'_gc_module': _groupcompress_py}),
35
if CompiledGroupCompressFeature.available():
36
from bzrlib import _groupcompress_pyx
37
scenarios.append(('C',
38
{'_gc_module': _groupcompress_pyx}))
39
two_way_scenarios.extend([
40
('CC', {'make_delta': _groupcompress_pyx.make_delta,
41
'apply_delta': _groupcompress_pyx.apply_delta}),
42
('PC', {'make_delta': _groupcompress_py.make_delta,
43
'apply_delta': _groupcompress_pyx.apply_delta}),
44
('CP', {'make_delta': _groupcompress_pyx.make_delta,
45
'apply_delta': _groupcompress_py.apply_delta}),
47
to_adapt, result = tests.split_suite_by_condition(
48
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
50
result = tests.multiply_tests(to_adapt, scenarios, result)
51
to_adapt, result = tests.split_suite_by_condition(result,
52
tests.condition_isinstance(TestMakeAndApplyCompatible))
53
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
57
class _CompiledGroupCompressFeature(tests.Feature):
61
import bzrlib._groupcompress_pyx
67
def feature_name(self):
68
return 'bzrlib._groupcompress_pyx'
71
CompiledGroupCompressFeature = _CompiledGroupCompressFeature()
76
which is meant to be matched
83
which is meant to differ from
90
which is meant to be matched
94
at the end of the file
100
common with the next text
104
some more bit of text, that
105
does not have much in
106
common with the previous text
107
and has some extra text
113
has some in common with the previous text
114
and has some extra text
116
common with the next text
130
class TestMakeAndApplyDelta(tests.TestCase):
132
_gc_module = None # Set by load_tests
135
super(TestMakeAndApplyDelta, self).setUp()
136
self.make_delta = self._gc_module.make_delta
137
self.apply_delta = self._gc_module.apply_delta
138
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
140
def test_make_delta_is_typesafe(self):
141
self.make_delta('a string', 'another string')
143
def _check_make_delta(string1, string2):
144
self.assertRaises(TypeError, self.make_delta, string1, string2)
146
_check_make_delta('a string', object())
147
_check_make_delta('a string', u'not a string')
148
_check_make_delta(object(), 'a string')
149
_check_make_delta(u'not a string', 'a string')
151
def test_make_noop_delta(self):
152
ident_delta = self.make_delta(_text1, _text1)
153
self.assertEqual('M\x90M', ident_delta)
154
ident_delta = self.make_delta(_text2, _text2)
155
self.assertEqual('N\x90N', ident_delta)
156
ident_delta = self.make_delta(_text3, _text3)
157
self.assertEqual('\x87\x01\x90\x87', ident_delta)
159
def assertDeltaIn(self, delta1, delta2, delta):
160
"""Make sure that the delta bytes match one of the expectations."""
161
# In general, the python delta matcher gives different results than the
162
# pyrex delta matcher. Both should be valid deltas, though.
163
if delta not in (delta1, delta2):
164
self.fail("Delta bytes:\n"
168
% (delta, delta1, delta2))
170
def test_make_delta(self):
171
delta = self.make_delta(_text1, _text2)
173
'N\x90/\x1fdiffer from\nagainst other text\n',
174
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
176
delta = self.make_delta(_text2, _text1)
178
'M\x90/\x1ebe matched\nagainst other text\n',
179
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
181
delta = self.make_delta(_text3, _text1)
182
self.assertEqual('M\x90M', delta)
183
delta = self.make_delta(_text3, _text2)
185
'N\x90/\x1fdiffer from\nagainst other text\n',
186
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
189
def test_make_delta_with_large_copies(self):
190
# We want to have a copy that is larger than 64kB, which forces us to
191
# issue multiple copy instructions.
192
big_text = _text3 * 1220
193
delta = self.make_delta(big_text, big_text)
195
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
196
'\x80' # Copy 64kB, starting at byte 0
197
'\x84\x01' # and another 64kB starting at 64kB
198
'\xb4\x02\x5c\x83', # And the bit of tail.
199
None, # Both implementations should be identical
202
def test_apply_delta_is_typesafe(self):
203
self.apply_delta(_text1, 'M\x90M')
204
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
205
self.assertRaises(TypeError, self.apply_delta,
206
unicode(_text1), 'M\x90M')
207
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
208
self.assertRaises(TypeError, self.apply_delta, _text1, object())
210
def test_apply_delta(self):
211
target = self.apply_delta(_text1,
212
'N\x90/\x1fdiffer from\nagainst other text\n')
213
self.assertEqual(_text2, target)
214
target = self.apply_delta(_text2,
215
'M\x90/\x1ebe matched\nagainst other text\n')
216
self.assertEqual(_text1, target)
218
def test_apply_delta_to_source_is_safe(self):
219
self.assertRaises(TypeError,
220
self.apply_delta_to_source, object(), 0, 1)
221
self.assertRaises(TypeError,
222
self.apply_delta_to_source, u'unicode str', 0, 1)
224
self.assertRaises(ValueError,
225
self.apply_delta_to_source, 'foo', 1, 4)
227
self.assertRaises(ValueError,
228
self.apply_delta_to_source, 'foo', 5, 3)
230
self.assertRaises(ValueError,
231
self.apply_delta_to_source, 'foo', 3, 2)
233
def test_apply_delta_to_source(self):
234
source_and_delta = (_text1
235
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
236
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
237
len(_text1), len(source_and_delta)))
240
class TestMakeAndApplyCompatible(tests.TestCase):
242
make_delta = None # Set by load_tests
243
apply_delta = None # Set by load_tests
245
def assertMakeAndApply(self, source, target):
246
"""Assert that generating a delta and applying gives success."""
247
delta = self.make_delta(source, target)
248
bytes = self.apply_delta(source, delta)
249
self.assertEqualDiff(target, bytes)
251
def test_direct(self):
252
self.assertMakeAndApply(_text1, _text2)
253
self.assertMakeAndApply(_text2, _text1)
254
self.assertMakeAndApply(_text1, _text3)
255
self.assertMakeAndApply(_text3, _text1)
256
self.assertMakeAndApply(_text2, _text3)
257
self.assertMakeAndApply(_text3, _text2)
260
class TestDeltaIndex(tests.TestCase):
263
super(TestDeltaIndex, self).setUp()
264
# This test isn't multiplied, because we only have DeltaIndex for the
266
# We call this here, because _test_needs_features happens after setUp
267
self.requireFeature(CompiledGroupCompressFeature)
268
from bzrlib import _groupcompress_pyx
269
self._gc_module = _groupcompress_pyx
272
di = self._gc_module.DeltaIndex('test text\n')
273
self.assertEqual('DeltaIndex(1, 10)', repr(di))
275
def test_make_delta(self):
276
di = self._gc_module.DeltaIndex(_text1)
277
delta = di.make_delta(_text2)
278
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
280
def test_delta_against_multiple_sources(self):
281
di = self._gc_module.DeltaIndex()
282
di.add_source(_first_text, 0)
283
self.assertEqual(len(_first_text), di._source_offset)
284
di.add_source(_second_text, 0)
285
self.assertEqual(len(_first_text) + len(_second_text),
287
delta = di.make_delta(_third_text)
288
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
289
self.assertEqualDiff(_third_text, result)
290
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
291
'\x91v6\x03and\x91d"\x91:\n', delta)
293
def test_delta_with_offsets(self):
294
di = self._gc_module.DeltaIndex()
295
di.add_source(_first_text, 5)
296
self.assertEqual(len(_first_text) + 5, di._source_offset)
297
di.add_source(_second_text, 10)
298
self.assertEqual(len(_first_text) + len(_second_text) + 15,
300
delta = di.make_delta(_third_text)
301
self.assertIsNot(None, delta)
302
result = self._gc_module.apply_delta(
303
'12345' + _first_text + '1234567890' + _second_text, delta)
304
self.assertIsNot(None, result)
305
self.assertEqualDiff(_third_text, result)
306
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
307
'\x91\x856\x03and\x91s"\x91?\n', delta)
309
def test_delta_with_delta_bytes(self):
310
di = self._gc_module.DeltaIndex()
312
di.add_source(_first_text, 0)
313
self.assertEqual(len(_first_text), di._source_offset)
314
delta = di.make_delta(_second_text)
315
self.assertEqual('h\tsome more\x91\x019'
316
'&previous text\nand has some extra text\n', delta)
317
di.add_delta_source(delta, 0)
319
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
320
second_delta = di.make_delta(_third_text)
321
result = self._gc_module.apply_delta(source, second_delta)
322
self.assertEqualDiff(_third_text, result)
323
# We should be able to match against the
324
# 'previous text\nand has some...' that was part of the delta bytes
325
# Note that we don't match the 'common with the', because it isn't long
326
# enough to match in the original text, and those bytes are not present
327
# in the delta for the second text.
328
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
329
'\x91S&\x03and\x91\x18,', second_delta)
330
# Add this delta, and create a new delta for the same text. We should
331
# find the remaining text, and only insert the short 'and' text.
332
di.add_delta_source(second_delta, 0)
333
source += second_delta
334
third_delta = di.make_delta(_third_text)
335
result = self._gc_module.apply_delta(source, third_delta)
336
self.assertEqualDiff(_third_text, result)
337
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
338
'\x91S&\x03and\x91\x18,', third_delta)
339
# Now create a delta, which we know won't be able to be 'fit' into the
341
fourth_delta = di.make_delta(_fourth_text)
342
self.assertEqual(_fourth_text,
343
self._gc_module.apply_delta(source, fourth_delta))
344
self.assertEqual('\x80\x01'
345
'\x7f123456789012345\nsame rabin hash\n'
346
'123456789012345\nsame rabin hash\n'
347
'123456789012345\nsame rabin hash\n'
348
'123456789012345\nsame rabin hash'
349
'\x01\n', fourth_delta)
350
di.add_delta_source(fourth_delta, 0)
351
source += fourth_delta
352
# With the next delta, everything should be found
353
fifth_delta = di.make_delta(_fourth_text)
354
self.assertEqual(_fourth_text,
355
self._gc_module.apply_delta(source, fifth_delta))
356
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
359
class TestCopyInstruction(tests.TestCase):
361
def assertEncode(self, expected, offset, length):
362
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
363
if expected != bytes:
364
self.assertEqual([hex(ord(e)) for e in expected],
365
[hex(ord(b)) for b in bytes])
367
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
368
cmd = ord(bytes[pos])
370
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
371
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
373
def test_encode_no_length(self):
374
self.assertEncode('\x80', 0, 64*1024)
375
self.assertEncode('\x81\x01', 1, 64*1024)
376
self.assertEncode('\x81\x0a', 10, 64*1024)
377
self.assertEncode('\x81\xff', 255, 64*1024)
378
self.assertEncode('\x82\x01', 256, 64*1024)
379
self.assertEncode('\x83\x01\x01', 257, 64*1024)
380
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
381
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
382
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
383
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
384
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
385
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
387
def test_encode_no_offset(self):
388
self.assertEncode('\x90\x01', 0, 1)
389
self.assertEncode('\x90\x0a', 0, 10)
390
self.assertEncode('\x90\xff', 0, 255)
391
self.assertEncode('\xA0\x01', 0, 256)
392
self.assertEncode('\xB0\x01\x01', 0, 257)
393
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
394
# Special case, if copy == 64KiB, then we store exactly 0
395
# Note that this puns with a copy of exactly 0 bytes, but we don't care
396
# about that, as we would never actually copy 0 bytes
397
self.assertEncode('\x80', 0, 64*1024)
399
def test_encode(self):
400
self.assertEncode('\x91\x01\x01', 1, 1)
401
self.assertEncode('\x91\x09\x0a', 9, 10)
402
self.assertEncode('\x91\xfe\xff', 254, 255)
403
self.assertEncode('\xA2\x02\x01', 512, 256)
404
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
405
self.assertEncode('\xB0\x01\x01', 0, 257)
406
# Special case, if copy == 64KiB, then we store exactly 0
407
# Note that this puns with a copy of exactly 0 bytes, but we don't care
408
# about that, as we would never actually copy 0 bytes
409
self.assertEncode('\x81\x0a', 10, 64*1024)
411
def test_decode_no_length(self):
412
# If length is 0, it is interpreted as 64KiB
413
# The shortest possible instruction is a copy of 64KiB from offset 0
414
self.assertDecode(0, 65536, 1, '\x80', 0)
415
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
416
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
417
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
418
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
419
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
420
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
421
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
422
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
423
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
424
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
425
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
427
def test_decode_no_offset(self):
428
self.assertDecode(0, 1, 2, '\x90\x01', 0)
429
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
430
self.assertDecode(0, 255, 2, '\x90\xff', 0)
431
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
432
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
433
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
434
# Special case, if copy == 64KiB, then we store exactly 0
435
# Note that this puns with a copy of exactly 0 bytes, but we don't care
436
# about that, as we would never actually copy 0 bytes
437
self.assertDecode(0, 65536, 1, '\x80', 0)
439
def test_decode(self):
440
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
441
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
442
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
443
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
444
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
445
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
447
def test_decode_not_start(self):
448
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
449
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
450
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
453
class TestBase128Int(tests.TestCase):
455
_gc_module = None # Set by load_tests
457
def assertEqualEncode(self, bytes, val):
458
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
460
def assertEqualDecode(self, val, num_decode, bytes):
461
self.assertEqual((val, num_decode),
462
self._gc_module.decode_base128_int(bytes))
464
def test_encode(self):
465
self.assertEqualEncode('\x01', 1)
466
self.assertEqualEncode('\x02', 2)
467
self.assertEqualEncode('\x7f', 127)
468
self.assertEqualEncode('\x80\x01', 128)
469
self.assertEqualEncode('\xff\x01', 255)
470
self.assertEqualEncode('\x80\x02', 256)
471
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
473
def test_decode(self):
474
self.assertEqualDecode(1, 1, '\x01')
475
self.assertEqualDecode(2, 1, '\x02')
476
self.assertEqualDecode(127, 1, '\x7f')
477
self.assertEqualDecode(128, 2, '\x80\x01')
478
self.assertEqualDecode(255, 2, '\xff\x01')
479
self.assertEqualDecode(256, 2, '\x80\x02')
480
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
482
def test_decode_with_trailing_bytes(self):
483
self.assertEqualDecode(1, 1, '\x01abcdef')
484
self.assertEqualDecode(127, 1, '\x7f\x01')
485
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
486
self.assertEqualDecode(255, 2, '\xff\x01\xff')