1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
25
from .scenarios import (
26
load_tests_apply_scenarios,
33
def module_scenarios():
35
('python', {'_gc_module': _groupcompress_py}),
37
if compiled_groupcompress_feature.available():
38
gc_module = compiled_groupcompress_feature.module
39
scenarios.append(('C',
40
{'_gc_module': gc_module}))
44
def two_way_scenarios():
46
('PP', {'make_delta': _groupcompress_py.make_delta,
47
'apply_delta': _groupcompress_py.apply_delta})
49
if compiled_groupcompress_feature.available():
50
gc_module = compiled_groupcompress_feature.module
52
('CC', {'make_delta': gc_module.make_delta,
53
'apply_delta': gc_module.apply_delta}),
54
('PC', {'make_delta': _groupcompress_py.make_delta,
55
'apply_delta': gc_module.apply_delta}),
56
('CP', {'make_delta': gc_module.make_delta,
57
'apply_delta': _groupcompress_py.apply_delta}),
62
load_tests = load_tests_apply_scenarios
65
compiled_groupcompress_feature = features.ModuleAvailableFeature(
66
'breezy.bzr._groupcompress_pyx')
71
which is meant to be matched
78
which is meant to differ from
85
which is meant to be matched
89
at the end of the file
95
common with the next text
99
some more bit of text, that
100
does not have much in
101
common with the previous text
102
and has some extra text
108
has some in common with the previous text
109
and has some extra text
111
common with the next text
125
class TestMakeAndApplyDelta(tests.TestCase):
127
scenarios = module_scenarios()
128
_gc_module = None # Set by load_tests
131
super(TestMakeAndApplyDelta, self).setUp()
132
self.make_delta = self._gc_module.make_delta
133
self.apply_delta = self._gc_module.apply_delta
134
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
136
def test_make_delta_is_typesafe(self):
137
self.make_delta('a string', 'another string')
139
def _check_make_delta(string1, string2):
140
self.assertRaises(TypeError, self.make_delta, string1, string2)
142
_check_make_delta('a string', object())
143
_check_make_delta('a string', u'not a string')
144
_check_make_delta(object(), 'a string')
145
_check_make_delta(u'not a string', 'a string')
147
def test_make_noop_delta(self):
148
ident_delta = self.make_delta(_text1, _text1)
149
self.assertEqual('M\x90M', ident_delta)
150
ident_delta = self.make_delta(_text2, _text2)
151
self.assertEqual('N\x90N', ident_delta)
152
ident_delta = self.make_delta(_text3, _text3)
153
self.assertEqual('\x87\x01\x90\x87', ident_delta)
155
def assertDeltaIn(self, delta1, delta2, delta):
156
"""Make sure that the delta bytes match one of the expectations."""
157
# In general, the python delta matcher gives different results than the
158
# pyrex delta matcher. Both should be valid deltas, though.
159
if delta not in (delta1, delta2):
160
self.fail("Delta bytes:\n"
164
% (delta, delta1, delta2))
166
def test_make_delta(self):
167
delta = self.make_delta(_text1, _text2)
169
'N\x90/\x1fdiffer from\nagainst other text\n',
170
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
172
delta = self.make_delta(_text2, _text1)
174
'M\x90/\x1ebe matched\nagainst other text\n',
175
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
177
delta = self.make_delta(_text3, _text1)
178
self.assertEqual('M\x90M', delta)
179
delta = self.make_delta(_text3, _text2)
181
'N\x90/\x1fdiffer from\nagainst other text\n',
182
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
185
def test_make_delta_with_large_copies(self):
186
# We want to have a copy that is larger than 64kB, which forces us to
187
# issue multiple copy instructions.
188
big_text = _text3 * 1220
189
delta = self.make_delta(big_text, big_text)
191
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
192
'\x80' # Copy 64kB, starting at byte 0
193
'\x84\x01' # and another 64kB starting at 64kB
194
'\xb4\x02\x5c\x83', # And the bit of tail.
195
None, # Both implementations should be identical
198
def test_apply_delta_is_typesafe(self):
199
self.apply_delta(_text1, 'M\x90M')
200
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
201
self.assertRaises(TypeError, self.apply_delta,
202
unicode(_text1), 'M\x90M')
203
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
204
self.assertRaises(TypeError, self.apply_delta, _text1, object())
206
def test_apply_delta(self):
207
target = self.apply_delta(_text1,
208
'N\x90/\x1fdiffer from\nagainst other text\n')
209
self.assertEqual(_text2, target)
210
target = self.apply_delta(_text2,
211
'M\x90/\x1ebe matched\nagainst other text\n')
212
self.assertEqual(_text1, target)
214
def test_apply_delta_to_source_is_safe(self):
215
self.assertRaises(TypeError,
216
self.apply_delta_to_source, object(), 0, 1)
217
self.assertRaises(TypeError,
218
self.apply_delta_to_source, u'unicode str', 0, 1)
220
self.assertRaises(ValueError,
221
self.apply_delta_to_source, 'foo', 1, 4)
223
self.assertRaises(ValueError,
224
self.apply_delta_to_source, 'foo', 5, 3)
226
self.assertRaises(ValueError,
227
self.apply_delta_to_source, 'foo', 3, 2)
229
def test_apply_delta_to_source(self):
230
source_and_delta = (_text1
231
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
232
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
233
len(_text1), len(source_and_delta)))
236
class TestMakeAndApplyCompatible(tests.TestCase):
238
scenarios = two_way_scenarios()
240
make_delta = None # Set by load_tests
241
apply_delta = None # Set by load_tests
243
def assertMakeAndApply(self, source, target):
244
"""Assert that generating a delta and applying gives success."""
245
delta = self.make_delta(source, target)
246
bytes = self.apply_delta(source, delta)
247
self.assertEqualDiff(target, bytes)
249
def test_direct(self):
250
self.assertMakeAndApply(_text1, _text2)
251
self.assertMakeAndApply(_text2, _text1)
252
self.assertMakeAndApply(_text1, _text3)
253
self.assertMakeAndApply(_text3, _text1)
254
self.assertMakeAndApply(_text2, _text3)
255
self.assertMakeAndApply(_text3, _text2)
258
class TestDeltaIndex(tests.TestCase):
261
super(TestDeltaIndex, self).setUp()
262
# This test isn't multiplied, because we only have DeltaIndex for the
264
# We call this here, because _test_needs_features happens after setUp
265
self.requireFeature(compiled_groupcompress_feature)
266
self._gc_module = compiled_groupcompress_feature.module
269
di = self._gc_module.DeltaIndex('test text\n')
270
self.assertEqual('DeltaIndex(1, 10)', repr(di))
272
def test__dump_no_index(self):
273
di = self._gc_module.DeltaIndex()
274
self.assertEqual(None, di._dump_index())
276
def test__dump_index_simple(self):
277
di = self._gc_module.DeltaIndex()
278
di.add_source(_text1, 0)
279
self.assertFalse(di._has_index())
280
self.assertEqual(None, di._dump_index())
281
_ = di.make_delta(_text1)
282
self.assertTrue(di._has_index())
283
hash_list, entry_list = di._dump_index()
284
self.assertEqual(16, len(hash_list))
285
self.assertEqual(68, len(entry_list))
286
just_entries = [(idx, text_offset, hash_val)
287
for idx, (text_offset, hash_val)
288
in enumerate(entry_list)
289
if text_offset != 0 or hash_val != 0]
290
rabin_hash = self._gc_module._rabin_hash
291
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
292
(25, 48, rabin_hash(_text1[33:49])),
293
(34, 32, rabin_hash(_text1[17:33])),
294
(47, 64, rabin_hash(_text1[49:65])),
296
# This ensures that the hash map points to the location we expect it to
297
for entry_idx, text_offset, hash_val in just_entries:
298
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
300
def test__dump_index_two_sources(self):
301
di = self._gc_module.DeltaIndex()
302
di.add_source(_text1, 0)
303
di.add_source(_text2, 2)
304
start2 = len(_text1) + 2
305
self.assertTrue(di._has_index())
306
hash_list, entry_list = di._dump_index()
307
self.assertEqual(16, len(hash_list))
308
self.assertEqual(68, len(entry_list))
309
just_entries = [(idx, text_offset, hash_val)
310
for idx, (text_offset, hash_val)
311
in enumerate(entry_list)
312
if text_offset != 0 or hash_val != 0]
313
rabin_hash = self._gc_module._rabin_hash
314
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
315
(9, start2+16, rabin_hash(_text2[1:17])),
316
(25, 48, rabin_hash(_text1[33:49])),
317
(30, start2+64, rabin_hash(_text2[49:65])),
318
(34, 32, rabin_hash(_text1[17:33])),
319
(35, start2+32, rabin_hash(_text2[17:33])),
320
(43, start2+48, rabin_hash(_text2[33:49])),
321
(47, 64, rabin_hash(_text1[49:65])),
323
# Each entry should be in the appropriate hash bucket.
324
for entry_idx, text_offset, hash_val in just_entries:
325
hash_idx = hash_val & 0xf
327
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
329
def test_first_add_source_doesnt_index_until_make_delta(self):
330
di = self._gc_module.DeltaIndex()
331
self.assertFalse(di._has_index())
332
di.add_source(_text1, 0)
333
self.assertFalse(di._has_index())
334
# However, asking to make a delta will trigger the index to be
335
# generated, and will generate a proper delta
336
delta = di.make_delta(_text2)
337
self.assertTrue(di._has_index())
338
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
340
def test_add_source_max_bytes_to_index(self):
341
di = self._gc_module.DeltaIndex()
342
di._max_bytes_to_index = 3*16
343
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
344
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
345
start2 = len(_text1) + 3
346
hash_list, entry_list = di._dump_index()
347
self.assertEqual(16, len(hash_list))
348
self.assertEqual(67, len(entry_list))
349
just_entries = sorted([(text_offset, hash_val)
350
for text_offset, hash_val in entry_list
351
if text_offset != 0 or hash_val != 0])
352
rabin_hash = self._gc_module._rabin_hash
353
self.assertEqual([(25, rabin_hash(_text1[10:26])),
354
(50, rabin_hash(_text1[35:51])),
355
(75, rabin_hash(_text1[60:76])),
356
(start2+44, rabin_hash(_text3[29:45])),
357
(start2+88, rabin_hash(_text3[73:89])),
358
(start2+132, rabin_hash(_text3[117:133])),
361
def test_second_add_source_triggers_make_index(self):
362
di = self._gc_module.DeltaIndex()
363
self.assertFalse(di._has_index())
364
di.add_source(_text1, 0)
365
self.assertFalse(di._has_index())
366
di.add_source(_text2, 0)
367
self.assertTrue(di._has_index())
369
def test_make_delta(self):
370
di = self._gc_module.DeltaIndex(_text1)
371
delta = di.make_delta(_text2)
372
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
374
def test_delta_against_multiple_sources(self):
375
di = self._gc_module.DeltaIndex()
376
di.add_source(_first_text, 0)
377
self.assertEqual(len(_first_text), di._source_offset)
378
di.add_source(_second_text, 0)
379
self.assertEqual(len(_first_text) + len(_second_text),
381
delta = di.make_delta(_third_text)
382
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
383
self.assertEqualDiff(_third_text, result)
384
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
385
'\x91v6\x03and\x91d"\x91:\n', delta)
387
def test_delta_with_offsets(self):
388
di = self._gc_module.DeltaIndex()
389
di.add_source(_first_text, 5)
390
self.assertEqual(len(_first_text) + 5, di._source_offset)
391
di.add_source(_second_text, 10)
392
self.assertEqual(len(_first_text) + len(_second_text) + 15,
394
delta = di.make_delta(_third_text)
395
self.assertIsNot(None, delta)
396
result = self._gc_module.apply_delta(
397
'12345' + _first_text + '1234567890' + _second_text, delta)
398
self.assertIsNot(None, result)
399
self.assertEqualDiff(_third_text, result)
400
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
401
'\x91\x856\x03and\x91s"\x91?\n', delta)
403
def test_delta_with_delta_bytes(self):
404
di = self._gc_module.DeltaIndex()
406
di.add_source(_first_text, 0)
407
self.assertEqual(len(_first_text), di._source_offset)
408
delta = di.make_delta(_second_text)
409
self.assertEqual('h\tsome more\x91\x019'
410
'&previous text\nand has some extra text\n', delta)
411
di.add_delta_source(delta, 0)
413
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
414
second_delta = di.make_delta(_third_text)
415
result = self._gc_module.apply_delta(source, second_delta)
416
self.assertEqualDiff(_third_text, result)
417
# We should be able to match against the
418
# 'previous text\nand has some...' that was part of the delta bytes
419
# Note that we don't match the 'common with the', because it isn't long
420
# enough to match in the original text, and those bytes are not present
421
# in the delta for the second text.
422
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
423
'\x91S&\x03and\x91\x18,', second_delta)
424
# Add this delta, and create a new delta for the same text. We should
425
# find the remaining text, and only insert the short 'and' text.
426
di.add_delta_source(second_delta, 0)
427
source += second_delta
428
third_delta = di.make_delta(_third_text)
429
result = self._gc_module.apply_delta(source, third_delta)
430
self.assertEqualDiff(_third_text, result)
431
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
432
'\x91S&\x03and\x91\x18,', third_delta)
433
# Now create a delta, which we know won't be able to be 'fit' into the
435
fourth_delta = di.make_delta(_fourth_text)
436
self.assertEqual(_fourth_text,
437
self._gc_module.apply_delta(source, fourth_delta))
438
self.assertEqual('\x80\x01'
439
'\x7f123456789012345\nsame rabin hash\n'
440
'123456789012345\nsame rabin hash\n'
441
'123456789012345\nsame rabin hash\n'
442
'123456789012345\nsame rabin hash'
443
'\x01\n', fourth_delta)
444
di.add_delta_source(fourth_delta, 0)
445
source += fourth_delta
446
# With the next delta, everything should be found
447
fifth_delta = di.make_delta(_fourth_text)
448
self.assertEqual(_fourth_text,
449
self._gc_module.apply_delta(source, fifth_delta))
450
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
453
class TestCopyInstruction(tests.TestCase):
455
def assertEncode(self, expected, offset, length):
456
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
457
if expected != bytes:
458
self.assertEqual([hex(ord(e)) for e in expected],
459
[hex(ord(b)) for b in bytes])
461
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
462
cmd = ord(bytes[pos])
464
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
465
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
467
def test_encode_no_length(self):
468
self.assertEncode('\x80', 0, 64*1024)
469
self.assertEncode('\x81\x01', 1, 64*1024)
470
self.assertEncode('\x81\x0a', 10, 64*1024)
471
self.assertEncode('\x81\xff', 255, 64*1024)
472
self.assertEncode('\x82\x01', 256, 64*1024)
473
self.assertEncode('\x83\x01\x01', 257, 64*1024)
474
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
475
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
476
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
477
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
478
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
479
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
481
def test_encode_no_offset(self):
482
self.assertEncode('\x90\x01', 0, 1)
483
self.assertEncode('\x90\x0a', 0, 10)
484
self.assertEncode('\x90\xff', 0, 255)
485
self.assertEncode('\xA0\x01', 0, 256)
486
self.assertEncode('\xB0\x01\x01', 0, 257)
487
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
488
# Special case, if copy == 64KiB, then we store exactly 0
489
# Note that this puns with a copy of exactly 0 bytes, but we don't care
490
# about that, as we would never actually copy 0 bytes
491
self.assertEncode('\x80', 0, 64*1024)
493
def test_encode(self):
494
self.assertEncode('\x91\x01\x01', 1, 1)
495
self.assertEncode('\x91\x09\x0a', 9, 10)
496
self.assertEncode('\x91\xfe\xff', 254, 255)
497
self.assertEncode('\xA2\x02\x01', 512, 256)
498
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
499
self.assertEncode('\xB0\x01\x01', 0, 257)
500
# Special case, if copy == 64KiB, then we store exactly 0
501
# Note that this puns with a copy of exactly 0 bytes, but we don't care
502
# about that, as we would never actually copy 0 bytes
503
self.assertEncode('\x81\x0a', 10, 64*1024)
505
def test_decode_no_length(self):
506
# If length is 0, it is interpreted as 64KiB
507
# The shortest possible instruction is a copy of 64KiB from offset 0
508
self.assertDecode(0, 65536, 1, '\x80', 0)
509
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
510
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
511
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
512
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
513
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
514
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
515
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
516
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
517
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
518
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
519
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
521
def test_decode_no_offset(self):
522
self.assertDecode(0, 1, 2, '\x90\x01', 0)
523
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
524
self.assertDecode(0, 255, 2, '\x90\xff', 0)
525
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
526
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
527
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
528
# Special case, if copy == 64KiB, then we store exactly 0
529
# Note that this puns with a copy of exactly 0 bytes, but we don't care
530
# about that, as we would never actually copy 0 bytes
531
self.assertDecode(0, 65536, 1, '\x80', 0)
533
def test_decode(self):
534
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
535
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
536
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
537
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
538
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
539
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
541
def test_decode_not_start(self):
542
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
543
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
544
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
547
class TestBase128Int(tests.TestCase):
549
scenarios = module_scenarios()
551
_gc_module = None # Set by load_tests
553
def assertEqualEncode(self, bytes, val):
554
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
556
def assertEqualDecode(self, val, num_decode, bytes):
557
self.assertEqual((val, num_decode),
558
self._gc_module.decode_base128_int(bytes))
560
def test_encode(self):
561
self.assertEqualEncode('\x01', 1)
562
self.assertEqualEncode('\x02', 2)
563
self.assertEqualEncode('\x7f', 127)
564
self.assertEqualEncode('\x80\x01', 128)
565
self.assertEqualEncode('\xff\x01', 255)
566
self.assertEqualEncode('\x80\x02', 256)
567
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
569
def test_decode(self):
570
self.assertEqualDecode(1, 1, '\x01')
571
self.assertEqualDecode(2, 1, '\x02')
572
self.assertEqualDecode(127, 1, '\x7f')
573
self.assertEqualDecode(128, 2, '\x80\x01')
574
self.assertEqualDecode(255, 2, '\xff\x01')
575
self.assertEqualDecode(256, 2, '\x80\x02')
576
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
578
def test_decode_with_trailing_bytes(self):
579
self.assertEqualDecode(1, 1, '\x01abcdef')
580
self.assertEqualDecode(127, 1, '\x7f\x01')
581
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
582
self.assertEqualDecode(255, 2, '\xff\x01\xff')