1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
25
from .scenarios import (
26
load_tests_apply_scenarios,
28
from ..sixish import (
36
def module_scenarios():
38
('python', {'_gc_module': _groupcompress_py}),
40
if compiled_groupcompress_feature.available():
41
gc_module = compiled_groupcompress_feature.module
42
scenarios.append(('C',
43
{'_gc_module': gc_module}))
47
def two_way_scenarios():
49
('PP', {'make_delta': _groupcompress_py.make_delta,
50
'apply_delta': _groupcompress_py.apply_delta})
52
if compiled_groupcompress_feature.available():
53
gc_module = compiled_groupcompress_feature.module
55
('CC', {'make_delta': gc_module.make_delta,
56
'apply_delta': gc_module.apply_delta}),
57
('PC', {'make_delta': _groupcompress_py.make_delta,
58
'apply_delta': gc_module.apply_delta}),
59
('CP', {'make_delta': gc_module.make_delta,
60
'apply_delta': _groupcompress_py.apply_delta}),
65
load_tests = load_tests_apply_scenarios
68
compiled_groupcompress_feature = features.ModuleAvailableFeature(
69
'breezy.bzr._groupcompress_pyx')
74
which is meant to be matched
81
which is meant to differ from
88
which is meant to be matched
92
at the end of the file
98
common with the next text
102
some more bit of text, that
103
does not have much in
104
common with the previous text
105
and has some extra text
111
has some in common with the previous text
112
and has some extra text
114
common with the next text
129
class TestMakeAndApplyDelta(tests.TestCase):
131
scenarios = module_scenarios()
132
_gc_module = None # Set by load_tests
135
super(TestMakeAndApplyDelta, self).setUp()
136
self.make_delta = self._gc_module.make_delta
137
self.apply_delta = self._gc_module.apply_delta
138
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
140
def test_make_delta_is_typesafe(self):
141
self.make_delta(b'a string', b'another string')
143
def _check_make_delta(string1, string2):
144
self.assertRaises(TypeError, self.make_delta, string1, string2)
146
_check_make_delta(b'a string', object())
147
_check_make_delta(b'a string', u'not a string')
148
_check_make_delta(object(), b'a string')
149
_check_make_delta(u'not a string', b'a string')
151
def test_make_noop_delta(self):
152
ident_delta = self.make_delta(_text1, _text1)
153
self.assertEqual(b'M\x90M', ident_delta)
154
ident_delta = self.make_delta(_text2, _text2)
155
self.assertEqual(b'N\x90N', ident_delta)
156
ident_delta = self.make_delta(_text3, _text3)
157
self.assertEqual(b'\x87\x01\x90\x87', ident_delta)
159
def assertDeltaIn(self, delta1, delta2, delta):
160
"""Make sure that the delta bytes match one of the expectations."""
161
# In general, the python delta matcher gives different results than the
162
# pyrex delta matcher. Both should be valid deltas, though.
163
if delta not in (delta1, delta2):
164
self.fail(b"Delta bytes:\n"
168
% (delta, delta1, delta2))
170
def test_make_delta(self):
171
delta = self.make_delta(_text1, _text2)
173
b'N\x90/\x1fdiffer from\nagainst other text\n',
174
b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
176
delta = self.make_delta(_text2, _text1)
178
b'M\x90/\x1ebe matched\nagainst other text\n',
179
b'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
181
delta = self.make_delta(_text3, _text1)
182
self.assertEqual(b'M\x90M', delta)
183
delta = self.make_delta(_text3, _text2)
185
b'N\x90/\x1fdiffer from\nagainst other text\n',
186
b'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
189
def test_make_delta_with_large_copies(self):
190
# We want to have a copy that is larger than 64kB, which forces us to
191
# issue multiple copy instructions.
192
big_text = _text3 * 1220
193
delta = self.make_delta(big_text, big_text)
195
b'\xdc\x86\x0a' # Encoding the length of the uncompressed text
196
b'\x80' # Copy 64kB, starting at byte 0
197
b'\x84\x01' # and another 64kB starting at 64kB
198
b'\xb4\x02\x5c\x83', # And the bit of tail.
199
None, # Both implementations should be identical
202
def test_apply_delta_is_typesafe(self):
203
self.apply_delta(_text1, b'M\x90M')
204
self.assertRaises(TypeError, self.apply_delta, object(), b'M\x90M')
205
self.assertRaises(TypeError, self.apply_delta,
206
_text1.decode('latin1'), b'M\x90M')
207
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
208
self.assertRaises(TypeError, self.apply_delta, _text1, object())
210
def test_apply_delta(self):
211
target = self.apply_delta(_text1,
212
b'N\x90/\x1fdiffer from\nagainst other text\n')
213
self.assertEqual(_text2, target)
214
target = self.apply_delta(_text2,
215
b'M\x90/\x1ebe matched\nagainst other text\n')
216
self.assertEqual(_text1, target)
218
def test_apply_delta_to_source_is_safe(self):
219
self.assertRaises(TypeError,
220
self.apply_delta_to_source, object(), 0, 1)
221
self.assertRaises(TypeError,
222
self.apply_delta_to_source, u'unicode str', 0, 1)
224
self.assertRaises(ValueError,
225
self.apply_delta_to_source, b'foo', 1, 4)
227
self.assertRaises(ValueError,
228
self.apply_delta_to_source, b'foo', 5, 3)
230
self.assertRaises(ValueError,
231
self.apply_delta_to_source, b'foo', 3, 2)
233
def test_apply_delta_to_source(self):
234
source_and_delta = (_text1
235
+ b'N\x90/\x1fdiffer from\nagainst other text\n')
236
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
237
len(_text1), len(source_and_delta)))
240
class TestMakeAndApplyCompatible(tests.TestCase):
242
scenarios = two_way_scenarios()
244
make_delta = None # Set by load_tests
245
apply_delta = None # Set by load_tests
247
def assertMakeAndApply(self, source, target):
248
"""Assert that generating a delta and applying gives success."""
249
delta = self.make_delta(source, target)
250
bytes = self.apply_delta(source, delta)
251
self.assertEqualDiff(target, bytes)
253
def test_direct(self):
254
self.assertMakeAndApply(_text1, _text2)
255
self.assertMakeAndApply(_text2, _text1)
256
self.assertMakeAndApply(_text1, _text3)
257
self.assertMakeAndApply(_text3, _text1)
258
self.assertMakeAndApply(_text2, _text3)
259
self.assertMakeAndApply(_text3, _text2)
262
class TestDeltaIndex(tests.TestCase):
265
super(TestDeltaIndex, self).setUp()
266
# This test isn't multiplied, because we only have DeltaIndex for the
268
# We call this here, because _test_needs_features happens after setUp
269
self.requireFeature(compiled_groupcompress_feature)
270
self._gc_module = compiled_groupcompress_feature.module
273
di = self._gc_module.DeltaIndex('test text\n')
274
self.assertEqual('DeltaIndex(1, 10)', repr(di))
276
def test__dump_no_index(self):
277
di = self._gc_module.DeltaIndex()
278
self.assertEqual(None, di._dump_index())
280
def test__dump_index_simple(self):
281
di = self._gc_module.DeltaIndex()
282
di.add_source(_text1, 0)
283
self.assertFalse(di._has_index())
284
self.assertEqual(None, di._dump_index())
285
_ = di.make_delta(_text1)
286
self.assertTrue(di._has_index())
287
hash_list, entry_list = di._dump_index()
288
self.assertEqual(16, len(hash_list))
289
self.assertEqual(68, len(entry_list))
290
just_entries = [(idx, text_offset, hash_val)
291
for idx, (text_offset, hash_val)
292
in enumerate(entry_list)
293
if text_offset != 0 or hash_val != 0]
294
rabin_hash = self._gc_module._rabin_hash
295
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
296
(25, 48, rabin_hash(_text1[33:49])),
297
(34, 32, rabin_hash(_text1[17:33])),
298
(47, 64, rabin_hash(_text1[49:65])),
300
# This ensures that the hash map points to the location we expect it to
301
for entry_idx, text_offset, hash_val in just_entries:
302
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
304
def test__dump_index_two_sources(self):
305
di = self._gc_module.DeltaIndex()
306
di.add_source(_text1, 0)
307
di.add_source(_text2, 2)
308
start2 = len(_text1) + 2
309
self.assertTrue(di._has_index())
310
hash_list, entry_list = di._dump_index()
311
self.assertEqual(16, len(hash_list))
312
self.assertEqual(68, len(entry_list))
313
just_entries = [(idx, text_offset, hash_val)
314
for idx, (text_offset, hash_val)
315
in enumerate(entry_list)
316
if text_offset != 0 or hash_val != 0]
317
rabin_hash = self._gc_module._rabin_hash
318
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
319
(9, start2+16, rabin_hash(_text2[1:17])),
320
(25, 48, rabin_hash(_text1[33:49])),
321
(30, start2+64, rabin_hash(_text2[49:65])),
322
(34, 32, rabin_hash(_text1[17:33])),
323
(35, start2+32, rabin_hash(_text2[17:33])),
324
(43, start2+48, rabin_hash(_text2[33:49])),
325
(47, 64, rabin_hash(_text1[49:65])),
327
# Each entry should be in the appropriate hash bucket.
328
for entry_idx, text_offset, hash_val in just_entries:
329
hash_idx = hash_val & 0xf
331
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
333
def test_first_add_source_doesnt_index_until_make_delta(self):
334
di = self._gc_module.DeltaIndex()
335
self.assertFalse(di._has_index())
336
di.add_source(_text1, 0)
337
self.assertFalse(di._has_index())
338
# However, asking to make a delta will trigger the index to be
339
# generated, and will generate a proper delta
340
delta = di.make_delta(_text2)
341
self.assertTrue(di._has_index())
342
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
344
def test_add_source_max_bytes_to_index(self):
345
di = self._gc_module.DeltaIndex()
346
di._max_bytes_to_index = 3*16
347
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
348
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
349
start2 = len(_text1) + 3
350
hash_list, entry_list = di._dump_index()
351
self.assertEqual(16, len(hash_list))
352
self.assertEqual(67, len(entry_list))
353
just_entries = sorted([(text_offset, hash_val)
354
for text_offset, hash_val in entry_list
355
if text_offset != 0 or hash_val != 0])
356
rabin_hash = self._gc_module._rabin_hash
357
self.assertEqual([(25, rabin_hash(_text1[10:26])),
358
(50, rabin_hash(_text1[35:51])),
359
(75, rabin_hash(_text1[60:76])),
360
(start2+44, rabin_hash(_text3[29:45])),
361
(start2+88, rabin_hash(_text3[73:89])),
362
(start2+132, rabin_hash(_text3[117:133])),
365
def test_second_add_source_triggers_make_index(self):
366
di = self._gc_module.DeltaIndex()
367
self.assertFalse(di._has_index())
368
di.add_source(_text1, 0)
369
self.assertFalse(di._has_index())
370
di.add_source(_text2, 0)
371
self.assertTrue(di._has_index())
373
def test_make_delta(self):
374
di = self._gc_module.DeltaIndex(_text1)
375
delta = di.make_delta(_text2)
376
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
378
def test_delta_against_multiple_sources(self):
379
di = self._gc_module.DeltaIndex()
380
di.add_source(_first_text, 0)
381
self.assertEqual(len(_first_text), di._source_offset)
382
di.add_source(_second_text, 0)
383
self.assertEqual(len(_first_text) + len(_second_text),
385
delta = di.make_delta(_third_text)
386
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
387
self.assertEqualDiff(_third_text, result)
388
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
389
'\x91v6\x03and\x91d"\x91:\n', delta)
391
def test_delta_with_offsets(self):
392
di = self._gc_module.DeltaIndex()
393
di.add_source(_first_text, 5)
394
self.assertEqual(len(_first_text) + 5, di._source_offset)
395
di.add_source(_second_text, 10)
396
self.assertEqual(len(_first_text) + len(_second_text) + 15,
398
delta = di.make_delta(_third_text)
399
self.assertIsNot(None, delta)
400
result = self._gc_module.apply_delta(
401
'12345' + _first_text + '1234567890' + _second_text, delta)
402
self.assertIsNot(None, result)
403
self.assertEqualDiff(_third_text, result)
404
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
405
'\x91\x856\x03and\x91s"\x91?\n', delta)
407
def test_delta_with_delta_bytes(self):
408
di = self._gc_module.DeltaIndex()
410
di.add_source(_first_text, 0)
411
self.assertEqual(len(_first_text), di._source_offset)
412
delta = di.make_delta(_second_text)
413
self.assertEqual('h\tsome more\x91\x019'
414
'&previous text\nand has some extra text\n', delta)
415
di.add_delta_source(delta, 0)
417
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
418
second_delta = di.make_delta(_third_text)
419
result = self._gc_module.apply_delta(source, second_delta)
420
self.assertEqualDiff(_third_text, result)
421
# We should be able to match against the
422
# 'previous text\nand has some...' that was part of the delta bytes
423
# Note that we don't match the 'common with the', because it isn't long
424
# enough to match in the original text, and those bytes are not present
425
# in the delta for the second text.
426
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
427
'\x91S&\x03and\x91\x18,', second_delta)
428
# Add this delta, and create a new delta for the same text. We should
429
# find the remaining text, and only insert the short 'and' text.
430
di.add_delta_source(second_delta, 0)
431
source += second_delta
432
third_delta = di.make_delta(_third_text)
433
result = self._gc_module.apply_delta(source, third_delta)
434
self.assertEqualDiff(_third_text, result)
435
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
436
'\x91S&\x03and\x91\x18,', third_delta)
437
# Now create a delta, which we know won't be able to be 'fit' into the
439
fourth_delta = di.make_delta(_fourth_text)
440
self.assertEqual(_fourth_text,
441
self._gc_module.apply_delta(source, fourth_delta))
442
self.assertEqual('\x80\x01'
443
'\x7f123456789012345\nsame rabin hash\n'
444
'123456789012345\nsame rabin hash\n'
445
'123456789012345\nsame rabin hash\n'
446
'123456789012345\nsame rabin hash'
447
'\x01\n', fourth_delta)
448
di.add_delta_source(fourth_delta, 0)
449
source += fourth_delta
450
# With the next delta, everything should be found
451
fifth_delta = di.make_delta(_fourth_text)
452
self.assertEqual(_fourth_text,
453
self._gc_module.apply_delta(source, fifth_delta))
454
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
457
class TestCopyInstruction(tests.TestCase):
459
def assertEncode(self, expected, offset, length):
460
data = _groupcompress_py.encode_copy_instruction(offset, length)
461
self.assertEqual(expected, data)
463
def assertDecode(self, exp_offset, exp_length, exp_newpos, data, pos):
464
cmd = indexbytes(data, pos)
466
out = _groupcompress_py.decode_copy_instruction(data, cmd, pos)
467
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
469
def test_encode_no_length(self):
470
self.assertEncode(b'\x80', 0, 64*1024)
471
self.assertEncode(b'\x81\x01', 1, 64*1024)
472
self.assertEncode(b'\x81\x0a', 10, 64*1024)
473
self.assertEncode(b'\x81\xff', 255, 64*1024)
474
self.assertEncode(b'\x82\x01', 256, 64*1024)
475
self.assertEncode(b'\x83\x01\x01', 257, 64*1024)
476
self.assertEncode(b'\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
477
self.assertEncode(b'\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
478
self.assertEncode(b'\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
479
self.assertEncode(b'\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
480
self.assertEncode(b'\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
481
self.assertEncode(b'\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
483
def test_encode_no_offset(self):
484
self.assertEncode(b'\x90\x01', 0, 1)
485
self.assertEncode(b'\x90\x0a', 0, 10)
486
self.assertEncode(b'\x90\xff', 0, 255)
487
self.assertEncode(b'\xA0\x01', 0, 256)
488
self.assertEncode(b'\xB0\x01\x01', 0, 257)
489
self.assertEncode(b'\xB0\xff\xff', 0, 0xFFFF)
490
# Special case, if copy == 64KiB, then we store exactly 0
491
# Note that this puns with a copy of exactly 0 bytes, but we don't care
492
# about that, as we would never actually copy 0 bytes
493
self.assertEncode(b'\x80', 0, 64*1024)
495
def test_encode(self):
496
self.assertEncode(b'\x91\x01\x01', 1, 1)
497
self.assertEncode(b'\x91\x09\x0a', 9, 10)
498
self.assertEncode(b'\x91\xfe\xff', 254, 255)
499
self.assertEncode(b'\xA2\x02\x01', 512, 256)
500
self.assertEncode(b'\xB3\x02\x01\x01\x01', 258, 257)
501
self.assertEncode(b'\xB0\x01\x01', 0, 257)
502
# Special case, if copy == 64KiB, then we store exactly 0
503
# Note that this puns with a copy of exactly 0 bytes, but we don't care
504
# about that, as we would never actually copy 0 bytes
505
self.assertEncode(b'\x81\x0a', 10, 64*1024)
507
def test_decode_no_length(self):
508
# If length is 0, it is interpreted as 64KiB
509
# The shortest possible instruction is a copy of 64KiB from offset 0
510
self.assertDecode(0, 65536, 1, b'\x80', 0)
511
self.assertDecode(1, 65536, 2, b'\x81\x01', 0)
512
self.assertDecode(10, 65536, 2, b'\x81\x0a', 0)
513
self.assertDecode(255, 65536, 2, b'\x81\xff', 0)
514
self.assertDecode(256, 65536, 2, b'\x82\x01', 0)
515
self.assertDecode(257, 65536, 3, b'\x83\x01\x01', 0)
516
self.assertDecode(0xFFFFFFFF, 65536, 5, b'\x8F\xff\xff\xff\xff', 0)
517
self.assertDecode(0xFFFFFF00, 65536, 4, b'\x8E\xff\xff\xff', 0)
518
self.assertDecode(0xFFFF00FF, 65536, 4, b'\x8D\xff\xff\xff', 0)
519
self.assertDecode(0xFF00FFFF, 65536, 4, b'\x8B\xff\xff\xff', 0)
520
self.assertDecode(0x00FFFFFF, 65536, 4, b'\x87\xff\xff\xff', 0)
521
self.assertDecode(0x01020304, 65536, 5, b'\x8F\x04\x03\x02\x01', 0)
523
def test_decode_no_offset(self):
524
self.assertDecode(0, 1, 2, b'\x90\x01', 0)
525
self.assertDecode(0, 10, 2, b'\x90\x0a', 0)
526
self.assertDecode(0, 255, 2, b'\x90\xff', 0)
527
self.assertDecode(0, 256, 2, b'\xA0\x01', 0)
528
self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
529
self.assertDecode(0, 65535, 3, b'\xB0\xff\xff', 0)
530
# Special case, if copy == 64KiB, then we store exactly 0
531
# Note that this puns with a copy of exactly 0 bytes, but we don't care
532
# about that, as we would never actually copy 0 bytes
533
self.assertDecode(0, 65536, 1, b'\x80', 0)
535
def test_decode(self):
536
self.assertDecode(1, 1, 3, b'\x91\x01\x01', 0)
537
self.assertDecode(9, 10, 3, b'\x91\x09\x0a', 0)
538
self.assertDecode(254, 255, 3, b'\x91\xfe\xff', 0)
539
self.assertDecode(512, 256, 3, b'\xA2\x02\x01', 0)
540
self.assertDecode(258, 257, 5, b'\xB3\x02\x01\x01\x01', 0)
541
self.assertDecode(0, 257, 3, b'\xB0\x01\x01', 0)
543
def test_decode_not_start(self):
544
self.assertDecode(1, 1, 6, b'abc\x91\x01\x01def', 3)
545
self.assertDecode(9, 10, 5, b'ab\x91\x09\x0ade', 2)
546
self.assertDecode(254, 255, 6, b'not\x91\xfe\xffcopy', 3)
549
class TestBase128Int(tests.TestCase):
551
scenarios = module_scenarios()
553
_gc_module = None # Set by load_tests
555
def assertEqualEncode(self, bytes, val):
556
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
558
def assertEqualDecode(self, val, num_decode, bytes):
559
self.assertEqual((val, num_decode),
560
self._gc_module.decode_base128_int(bytes))
562
def test_encode(self):
563
self.assertEqualEncode(b'\x01', 1)
564
self.assertEqualEncode(b'\x02', 2)
565
self.assertEqualEncode(b'\x7f', 127)
566
self.assertEqualEncode(b'\x80\x01', 128)
567
self.assertEqualEncode(b'\xff\x01', 255)
568
self.assertEqualEncode(b'\x80\x02', 256)
569
self.assertEqualEncode(b'\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
571
def test_decode(self):
572
self.assertEqualDecode(1, 1, b'\x01')
573
self.assertEqualDecode(2, 1, b'\x02')
574
self.assertEqualDecode(127, 1, b'\x7f')
575
self.assertEqualDecode(128, 2, b'\x80\x01')
576
self.assertEqualDecode(255, 2, b'\xff\x01')
577
self.assertEqualDecode(256, 2, b'\x80\x02')
578
self.assertEqualDecode(0xFFFFFFFF, 5, b'\xff\xff\xff\xff\x0f')
580
def test_decode_with_trailing_bytes(self):
581
self.assertEqualDecode(1, 1, b'\x01abcdef')
582
self.assertEqualDecode(127, 1, b'\x7f\x01')
583
self.assertEqualDecode(128, 2, b'\x80\x01abcdef')
584
self.assertEqualDecode(255, 2, b'\xff\x01\xff')