1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
26
def load_tests(standard_tests, module, loader):
27
"""Parameterize tests for all versions of groupcompress."""
29
('PP', {'make_delta': _groupcompress_py.make_delta,
30
'apply_delta': _groupcompress_py.apply_delta})
33
('python', {'_gc_module': _groupcompress_py}),
35
if compiled_groupcompress.available():
36
scenarios.append(('C',
37
{'_gc_module': compiled_groupcompress.module}))
38
two_way_scenarios.extend([
39
('CC', {'make_delta': compiled_groupcompress.module.make_delta,
40
'apply_delta': compiled_groupcompress.module.apply_delta}),
41
('PC', {'make_delta': _groupcompress_py.make_delta,
42
'apply_delta': compiled_groupcompress.module.apply_delta}),
43
('CP', {'make_delta': compiled_groupcompress.module.make_delta,
44
'apply_delta': _groupcompress_py.apply_delta}),
46
to_adapt, result = tests.split_suite_by_condition(
47
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
49
result = tests.multiply_tests(to_adapt, scenarios, result)
50
to_adapt, result = tests.split_suite_by_condition(result,
51
tests.condition_isinstance(TestMakeAndApplyCompatible))
52
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
56
compiled_groupcompress = tests.ModuleAvailableFeature(
57
'bzrlib._groupcompress_pyx')
62
which is meant to be matched
69
which is meant to differ from
76
which is meant to be matched
80
at the end of the file
86
common with the next text
90
some more bit of text, that
92
common with the previous text
93
and has some extra text
99
has some in common with the previous text
100
and has some extra text
102
common with the next text
116
class TestMakeAndApplyDelta(tests.TestCase):
118
_gc_module = None # Set by load_tests
121
super(TestMakeAndApplyDelta, self).setUp()
122
self.make_delta = self._gc_module.make_delta
123
self.apply_delta = self._gc_module.apply_delta
124
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
126
def test_make_delta_is_typesafe(self):
127
self.make_delta('a string', 'another string')
129
def _check_make_delta(string1, string2):
130
self.assertRaises(TypeError, self.make_delta, string1, string2)
132
_check_make_delta('a string', object())
133
_check_make_delta('a string', u'not a string')
134
_check_make_delta(object(), 'a string')
135
_check_make_delta(u'not a string', 'a string')
137
def test_make_noop_delta(self):
138
ident_delta = self.make_delta(_text1, _text1)
139
self.assertEqual('M\x90M', ident_delta)
140
ident_delta = self.make_delta(_text2, _text2)
141
self.assertEqual('N\x90N', ident_delta)
142
ident_delta = self.make_delta(_text3, _text3)
143
self.assertEqual('\x87\x01\x90\x87', ident_delta)
145
def assertDeltaIn(self, delta1, delta2, delta):
146
"""Make sure that the delta bytes match one of the expectations."""
147
# In general, the python delta matcher gives different results than the
148
# pyrex delta matcher. Both should be valid deltas, though.
149
if delta not in (delta1, delta2):
150
self.fail("Delta bytes:\n"
154
% (delta, delta1, delta2))
156
def test_make_delta(self):
157
delta = self.make_delta(_text1, _text2)
159
'N\x90/\x1fdiffer from\nagainst other text\n',
160
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
162
delta = self.make_delta(_text2, _text1)
164
'M\x90/\x1ebe matched\nagainst other text\n',
165
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
167
delta = self.make_delta(_text3, _text1)
168
self.assertEqual('M\x90M', delta)
169
delta = self.make_delta(_text3, _text2)
171
'N\x90/\x1fdiffer from\nagainst other text\n',
172
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
175
def test_make_delta_with_large_copies(self):
176
# We want to have a copy that is larger than 64kB, which forces us to
177
# issue multiple copy instructions.
178
big_text = _text3 * 1220
179
delta = self.make_delta(big_text, big_text)
181
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
182
'\x80' # Copy 64kB, starting at byte 0
183
'\x84\x01' # and another 64kB starting at 64kB
184
'\xb4\x02\x5c\x83', # And the bit of tail.
185
None, # Both implementations should be identical
188
def test_apply_delta_is_typesafe(self):
189
self.apply_delta(_text1, 'M\x90M')
190
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
191
self.assertRaises(TypeError, self.apply_delta,
192
unicode(_text1), 'M\x90M')
193
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
194
self.assertRaises(TypeError, self.apply_delta, _text1, object())
196
def test_apply_delta(self):
197
target = self.apply_delta(_text1,
198
'N\x90/\x1fdiffer from\nagainst other text\n')
199
self.assertEqual(_text2, target)
200
target = self.apply_delta(_text2,
201
'M\x90/\x1ebe matched\nagainst other text\n')
202
self.assertEqual(_text1, target)
204
def test_apply_delta_to_source_is_safe(self):
205
self.assertRaises(TypeError,
206
self.apply_delta_to_source, object(), 0, 1)
207
self.assertRaises(TypeError,
208
self.apply_delta_to_source, u'unicode str', 0, 1)
210
self.assertRaises(ValueError,
211
self.apply_delta_to_source, 'foo', 1, 4)
213
self.assertRaises(ValueError,
214
self.apply_delta_to_source, 'foo', 5, 3)
216
self.assertRaises(ValueError,
217
self.apply_delta_to_source, 'foo', 3, 2)
219
def test_apply_delta_to_source(self):
220
source_and_delta = (_text1
221
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
222
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
223
len(_text1), len(source_and_delta)))
226
class TestMakeAndApplyCompatible(tests.TestCase):
228
make_delta = None # Set by load_tests
229
apply_delta = None # Set by load_tests
231
def assertMakeAndApply(self, source, target):
232
"""Assert that generating a delta and applying gives success."""
233
delta = self.make_delta(source, target)
234
bytes = self.apply_delta(source, delta)
235
self.assertEqualDiff(target, bytes)
237
def test_direct(self):
238
self.assertMakeAndApply(_text1, _text2)
239
self.assertMakeAndApply(_text2, _text1)
240
self.assertMakeAndApply(_text1, _text3)
241
self.assertMakeAndApply(_text3, _text1)
242
self.assertMakeAndApply(_text2, _text3)
243
self.assertMakeAndApply(_text3, _text2)
246
class TestDeltaIndex(tests.TestCase):
249
super(TestDeltaIndex, self).setUp()
250
# This test isn't multiplied, because we only have DeltaIndex for the
252
# We call this here, because _test_needs_features happens after setUp
253
self.requireFeature(compiled_groupcompress)
254
self._gc_module = compiled_groupcompress.module
257
di = self._gc_module.DeltaIndex('test text\n')
258
self.assertEqual('DeltaIndex(1, 10)', repr(di))
260
def test_first_add_source_doesnt_index_until_make_delta(self):
261
di = self._gc_module.DeltaIndex()
262
self.assertFalse(di._has_index())
263
di.add_source(_text1, 0)
264
self.assertFalse(di._has_index())
265
# However, asking to make a delta will trigger the index to be
266
# generated, and will generate a proper delta
267
delta = di.make_delta(_text2)
268
self.assertTrue(di._has_index())
269
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
271
def test_second_add_source_triggers_make_index(self):
272
di = self._gc_module.DeltaIndex()
273
self.assertFalse(di._has_index())
274
di.add_source(_text1, 0)
275
self.assertFalse(di._has_index())
276
di.add_source(_text2, 0)
277
self.assertTrue(di._has_index())
279
def test_make_delta(self):
280
di = self._gc_module.DeltaIndex(_text1)
281
delta = di.make_delta(_text2)
282
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
284
def test_delta_against_multiple_sources(self):
285
di = self._gc_module.DeltaIndex()
286
di.add_source(_first_text, 0)
287
self.assertEqual(len(_first_text), di._source_offset)
288
di.add_source(_second_text, 0)
289
self.assertEqual(len(_first_text) + len(_second_text),
291
delta = di.make_delta(_third_text)
292
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
293
self.assertEqualDiff(_third_text, result)
294
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
295
'\x91v6\x03and\x91d"\x91:\n', delta)
297
def test_delta_with_offsets(self):
298
di = self._gc_module.DeltaIndex()
299
di.add_source(_first_text, 5)
300
self.assertEqual(len(_first_text) + 5, di._source_offset)
301
di.add_source(_second_text, 10)
302
self.assertEqual(len(_first_text) + len(_second_text) + 15,
304
delta = di.make_delta(_third_text)
305
self.assertIsNot(None, delta)
306
result = self._gc_module.apply_delta(
307
'12345' + _first_text + '1234567890' + _second_text, delta)
308
self.assertIsNot(None, result)
309
self.assertEqualDiff(_third_text, result)
310
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
311
'\x91\x856\x03and\x91s"\x91?\n', delta)
313
def test_delta_with_delta_bytes(self):
314
di = self._gc_module.DeltaIndex()
316
di.add_source(_first_text, 0)
317
self.assertEqual(len(_first_text), di._source_offset)
318
delta = di.make_delta(_second_text)
319
self.assertEqual('h\tsome more\x91\x019'
320
'&previous text\nand has some extra text\n', delta)
321
di.add_delta_source(delta, 0)
323
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
324
second_delta = di.make_delta(_third_text)
325
result = self._gc_module.apply_delta(source, second_delta)
326
self.assertEqualDiff(_third_text, result)
327
# We should be able to match against the
328
# 'previous text\nand has some...' that was part of the delta bytes
329
# Note that we don't match the 'common with the', because it isn't long
330
# enough to match in the original text, and those bytes are not present
331
# in the delta for the second text.
332
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
333
'\x91S&\x03and\x91\x18,', second_delta)
334
# Add this delta, and create a new delta for the same text. We should
335
# find the remaining text, and only insert the short 'and' text.
336
di.add_delta_source(second_delta, 0)
337
source += second_delta
338
third_delta = di.make_delta(_third_text)
339
result = self._gc_module.apply_delta(source, third_delta)
340
self.assertEqualDiff(_third_text, result)
341
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
342
'\x91S&\x03and\x91\x18,', third_delta)
343
# Now create a delta, which we know won't be able to be 'fit' into the
345
fourth_delta = di.make_delta(_fourth_text)
346
self.assertEqual(_fourth_text,
347
self._gc_module.apply_delta(source, fourth_delta))
348
self.assertEqual('\x80\x01'
349
'\x7f123456789012345\nsame rabin hash\n'
350
'123456789012345\nsame rabin hash\n'
351
'123456789012345\nsame rabin hash\n'
352
'123456789012345\nsame rabin hash'
353
'\x01\n', fourth_delta)
354
di.add_delta_source(fourth_delta, 0)
355
source += fourth_delta
356
# With the next delta, everything should be found
357
fifth_delta = di.make_delta(_fourth_text)
358
self.assertEqual(_fourth_text,
359
self._gc_module.apply_delta(source, fifth_delta))
360
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
363
class TestCopyInstruction(tests.TestCase):
365
def assertEncode(self, expected, offset, length):
366
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
367
if expected != bytes:
368
self.assertEqual([hex(ord(e)) for e in expected],
369
[hex(ord(b)) for b in bytes])
371
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
372
cmd = ord(bytes[pos])
374
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
375
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
377
def test_encode_no_length(self):
378
self.assertEncode('\x80', 0, 64*1024)
379
self.assertEncode('\x81\x01', 1, 64*1024)
380
self.assertEncode('\x81\x0a', 10, 64*1024)
381
self.assertEncode('\x81\xff', 255, 64*1024)
382
self.assertEncode('\x82\x01', 256, 64*1024)
383
self.assertEncode('\x83\x01\x01', 257, 64*1024)
384
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
385
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
386
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
387
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
388
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
389
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
391
def test_encode_no_offset(self):
392
self.assertEncode('\x90\x01', 0, 1)
393
self.assertEncode('\x90\x0a', 0, 10)
394
self.assertEncode('\x90\xff', 0, 255)
395
self.assertEncode('\xA0\x01', 0, 256)
396
self.assertEncode('\xB0\x01\x01', 0, 257)
397
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
398
# Special case, if copy == 64KiB, then we store exactly 0
399
# Note that this puns with a copy of exactly 0 bytes, but we don't care
400
# about that, as we would never actually copy 0 bytes
401
self.assertEncode('\x80', 0, 64*1024)
403
def test_encode(self):
404
self.assertEncode('\x91\x01\x01', 1, 1)
405
self.assertEncode('\x91\x09\x0a', 9, 10)
406
self.assertEncode('\x91\xfe\xff', 254, 255)
407
self.assertEncode('\xA2\x02\x01', 512, 256)
408
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
409
self.assertEncode('\xB0\x01\x01', 0, 257)
410
# Special case, if copy == 64KiB, then we store exactly 0
411
# Note that this puns with a copy of exactly 0 bytes, but we don't care
412
# about that, as we would never actually copy 0 bytes
413
self.assertEncode('\x81\x0a', 10, 64*1024)
415
def test_decode_no_length(self):
416
# If length is 0, it is interpreted as 64KiB
417
# The shortest possible instruction is a copy of 64KiB from offset 0
418
self.assertDecode(0, 65536, 1, '\x80', 0)
419
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
420
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
421
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
422
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
423
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
424
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
425
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
426
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
427
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
428
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
429
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
431
def test_decode_no_offset(self):
432
self.assertDecode(0, 1, 2, '\x90\x01', 0)
433
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
434
self.assertDecode(0, 255, 2, '\x90\xff', 0)
435
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
436
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
437
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
438
# Special case, if copy == 64KiB, then we store exactly 0
439
# Note that this puns with a copy of exactly 0 bytes, but we don't care
440
# about that, as we would never actually copy 0 bytes
441
self.assertDecode(0, 65536, 1, '\x80', 0)
443
def test_decode(self):
444
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
445
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
446
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
447
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
448
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
449
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
451
def test_decode_not_start(self):
452
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
453
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
454
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
457
class TestBase128Int(tests.TestCase):
459
_gc_module = None # Set by load_tests
461
def assertEqualEncode(self, bytes, val):
462
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
464
def assertEqualDecode(self, val, num_decode, bytes):
465
self.assertEqual((val, num_decode),
466
self._gc_module.decode_base128_int(bytes))
468
def test_encode(self):
469
self.assertEqualEncode('\x01', 1)
470
self.assertEqualEncode('\x02', 2)
471
self.assertEqualEncode('\x7f', 127)
472
self.assertEqualEncode('\x80\x01', 128)
473
self.assertEqualEncode('\xff\x01', 255)
474
self.assertEqualEncode('\x80\x02', 256)
475
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
477
def test_decode(self):
478
self.assertEqualDecode(1, 1, '\x01')
479
self.assertEqualDecode(2, 1, '\x02')
480
self.assertEqualDecode(127, 1, '\x7f')
481
self.assertEqualDecode(128, 2, '\x80\x01')
482
self.assertEqualDecode(255, 2, '\xff\x01')
483
self.assertEqualDecode(256, 2, '\x80\x02')
484
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
486
def test_decode_with_trailing_bytes(self):
487
self.assertEqualDecode(1, 1, '\x01abcdef')
488
self.assertEqualDecode(127, 1, '\x7f\x01')
489
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
490
self.assertEqualDecode(255, 2, '\xff\x01\xff')