1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
# TODO: tests regarding version names
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
22
"""test suite for weave algorithm"""
24
from io import BytesIO
25
from pprint import pformat
30
from ..osutils import sha_string
31
from . import TestCase, TestCaseInTempDir
32
from ..bzr.weave import Weave, WeaveFormatError, WeaveInvalidChecksum
33
from ..bzr.weavefile import write_weave, read_weave
36
# texts for use in testing
37
TEXT_0 = [b"Hello world"]
38
TEXT_1 = [b"Hello world",
42
class TestBase(TestCase):
44
def check_read_write(self, k):
45
"""Check the weave k can be written & re-read."""
46
from tempfile import TemporaryFile
55
self.log('serialized weave:')
59
self.log('parents: %s' % (k._parents == k2._parents))
60
self.log(' %r' % k._parents)
61
self.log(' %r' % k2._parents)
63
self.fail('read/write check failed')
66
class WeaveContains(TestBase):
67
"""Weave __contains__ operator"""
70
k = Weave(get_scope=lambda: None)
71
self.assertFalse(b'foo' in k)
72
k.add_lines(b'foo', [], TEXT_1)
73
self.assertTrue(b'foo' in k)
82
class AnnotateOne(TestBase):
86
k.add_lines(b'text0', [], TEXT_0)
87
self.assertEqual(k.annotate(b'text0'),
88
[(b'text0', TEXT_0[0])])
91
class InvalidAdd(TestBase):
92
"""Try to use invalid version number during add."""
97
self.assertRaises(errors.RevisionNotPresent,
104
class RepeatedAdd(TestBase):
105
"""Add the same version twice; harmless."""
107
def test_duplicate_add(self):
109
idx = k.add_lines(b'text0', [], TEXT_0)
110
idx2 = k.add_lines(b'text0', [], TEXT_0)
111
self.assertEqual(idx, idx2)
114
class InvalidRepeatedAdd(TestBase):
118
k.add_lines(b'basis', [], TEXT_0)
119
k.add_lines(b'text0', [], TEXT_0)
120
self.assertRaises(errors.RevisionAlreadyPresent,
124
[b'not the same text'])
125
self.assertRaises(errors.RevisionAlreadyPresent,
128
[b'basis'], # not the right parents
132
class InsertLines(TestBase):
133
"""Store a revision that adds one line to the original.
135
Look at the annotations to make sure that the first line is matched
136
and not stored repeatedly."""
141
k.add_lines(b'text0', [], [b'line 1'])
142
k.add_lines(b'text1', [b'text0'], [b'line 1', b'line 2'])
144
self.assertEqual(k.annotate(b'text0'),
145
[(b'text0', b'line 1')])
147
self.assertEqual(k.get_lines(1),
151
self.assertEqual(k.annotate(b'text1'),
152
[(b'text0', b'line 1'),
153
(b'text1', b'line 2')])
155
k.add_lines(b'text2', [b'text0'], [b'line 1', b'diverged line'])
157
self.assertEqual(k.annotate(b'text2'),
158
[(b'text0', b'line 1'),
159
(b'text2', b'diverged line')])
161
text3 = [b'line 1', b'middle line', b'line 2']
162
k.add_lines(b'text3',
163
[b'text0', b'text1'],
166
# self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]),
169
self.log("k._weave=" + pformat(k._weave))
171
self.assertEqual(k.annotate(b'text3'),
172
[(b'text0', b'line 1'),
173
(b'text3', b'middle line'),
174
(b'text1', b'line 2')])
176
# now multiple insertions at different places
178
b'text4', [b'text0', b'text1', b'text3'],
179
[b'line 1', b'aaa', b'middle line', b'bbb', b'line 2', b'ccc'])
181
self.assertEqual(k.annotate(b'text4'),
182
[(b'text0', b'line 1'),
184
(b'text3', b'middle line'),
186
(b'text1', b'line 2'),
190
class DeleteLines(TestBase):
191
"""Deletion of lines from existing text.
193
Try various texts all based on a common ancestor."""
198
base_text = [b'one', b'two', b'three', b'four']
200
k.add_lines(b'text0', [], base_text)
202
texts = [[b'one', b'two', b'three'],
203
[b'two', b'three', b'four'],
205
[b'one', b'two', b'three', b'four'],
210
k.add_lines(b'text%d' % i, [b'text0'], t)
213
self.log('final weave:')
214
self.log('k._weave=' + pformat(k._weave))
216
for i in range(len(texts)):
217
self.assertEqual(k.get_lines(i + 1),
221
class SuicideDelete(TestBase):
222
"""Invalid weave which tries to add and delete simultaneously."""
229
k._weave = [(b'{', 0),
237
# Weave.get doesn't trap this anymore
240
self.assertRaises(WeaveFormatError,
245
class CannedDelete(TestBase):
246
"""Unpack canned weave with deleted lines."""
254
k._weave = [(b'{', 0),
257
b'line to be deleted',
263
sha_string(b'first lineline to be deletedlast line'),
264
sha_string(b'first linelast line')]
266
self.assertEqual(k.get_lines(0),
268
b'line to be deleted',
272
self.assertEqual(k.get_lines(1),
278
class CannedReplacement(TestBase):
279
"""Unpack canned weave with deleted lines."""
284
k._parents = [frozenset(),
287
k._weave = [(b'{', 0),
290
b'line to be deleted',
299
sha_string(b'first lineline to be deletedlast line'),
300
sha_string(b'first linereplacement linelast line')]
302
self.assertEqual(k.get_lines(0),
304
b'line to be deleted',
308
self.assertEqual(k.get_lines(1),
315
class BadWeave(TestBase):
316
"""Test that we trap an insert which should not occur."""
321
k._parents = [frozenset(),
323
k._weave = [b'bad line',
327
b' added in version 1',
337
# Weave.get doesn't trap this anymore
340
self.assertRaises(WeaveFormatError,
345
class BadInsert(TestBase):
346
"""Test that we trap an insert which should not occur."""
351
k._parents = [frozenset(),
354
frozenset([0, 1, 2]),
356
k._weave = [(b'{', 0),
359
b' added in version 1',
366
# this is not currently enforced by get
369
self.assertRaises(WeaveFormatError,
373
self.assertRaises(WeaveFormatError,
378
class InsertNested(TestBase):
379
"""Insertion with nested instructions."""
384
k._parents = [frozenset(),
387
frozenset([0, 1, 2]),
389
k._weave = [(b'{', 0),
392
b' added in version 1',
402
sha_string(b'foo {}'),
403
sha_string(b'foo { added in version 1 also from v1}'),
404
sha_string(b'foo { added in v2}'),
406
b'foo { added in version 1 added in v2 also from v1}')
409
self.assertEqual(k.get_lines(0),
413
self.assertEqual(k.get_lines(1),
415
b' added in version 1',
419
self.assertEqual(k.get_lines(2),
424
self.assertEqual(k.get_lines(3),
426
b' added in version 1',
432
class DeleteLines2(TestBase):
433
"""Test recording revisions that delete lines.
435
This relies on the weave having a way to represent lines knocked
436
out by a later revision."""
441
k.add_lines(b'text0', [], [b"line the first",
446
self.assertEqual(len(k.get_lines(0)), 4)
448
k.add_lines(b'text1', [b'text0'], [b"line the first",
451
self.assertEqual(k.get_lines(1),
455
self.assertEqual(k.annotate(b'text1'),
456
[(b'text0', b"line the first"),
457
(b'text0', b"fine")])
460
class IncludeVersions(TestBase):
461
"""Check texts that are stored across multiple revisions.
463
Here we manually create a weave with particular encoding and make
464
sure it unpacks properly.
466
Text 0 includes nothing; text 1 includes text 0 and adds some
473
k._parents = [frozenset(), frozenset([0])]
474
k._weave = [(b'{', 0),
481
k._sha1s = [sha_string(b'first line'), sha_string(
482
b'first linesecond line')]
484
self.assertEqual(k.get_lines(1),
488
self.assertEqual(k.get_lines(0),
492
class DivergedIncludes(TestBase):
493
"""Weave with two diverged texts based on version 0.
497
# FIXME make the weave, dont poke at it.
500
k._names = [b'0', b'1', b'2']
501
k._name_map = {b'0': 0, b'1': 1, b'2': 2}
502
k._parents = [frozenset(),
506
k._weave = [(b'{', 0),
513
b"alternative second line",
518
sha_string(b'first line'),
519
sha_string(b'first linesecond line'),
520
sha_string(b'first linealternative second line')]
522
self.assertEqual(k.get_lines(0),
525
self.assertEqual(k.get_lines(1),
529
self.assertEqual(k.get_lines(b'2'),
531
b"alternative second line"])
533
self.assertEqual(list(k.get_ancestry([b'2'])),
537
class ReplaceLine(TestBase):
541
text0 = [b'cheddar', b'stilton', b'gruyere']
542
text1 = [b'cheddar', b'blue vein', b'neufchatel', b'chevre']
544
k.add_lines(b'text0', [], text0)
545
k.add_lines(b'text1', [b'text0'], text1)
547
self.log('k._weave=' + pformat(k._weave))
549
self.assertEqual(k.get_lines(0), text0)
550
self.assertEqual(k.get_lines(1), text1)
553
class Merge(TestBase):
554
"""Storage of versions that merge diverged parents"""
561
[b'header', b'', b'line from 1'],
562
[b'header', b'', b'line from 2', b'more from 2'],
563
[b'header', b'', b'line from 1', b'fixup line', b'line from 2'],
566
k.add_lines(b'text0', [], texts[0])
567
k.add_lines(b'text1', [b'text0'], texts[1])
568
k.add_lines(b'text2', [b'text0'], texts[2])
569
k.add_lines(b'merge', [b'text0', b'text1', b'text2'], texts[3])
571
for i, t in enumerate(texts):
572
self.assertEqual(k.get_lines(i), t)
574
self.assertEqual(k.annotate(b'merge'),
575
[(b'text0', b'header'),
577
(b'text1', b'line from 1'),
578
(b'merge', b'fixup line'),
579
(b'text2', b'line from 2'),
582
self.assertEqual(list(k.get_ancestry([b'merge'])),
583
[b'text0', b'text1', b'text2', b'merge'])
585
self.log('k._weave=' + pformat(k._weave))
587
self.check_read_write(k)
590
class Conflicts(TestBase):
591
"""Test detection of conflicting regions during a merge.
593
A base version is inserted, then two descendents try to
594
insert different lines in the same place. These should be
595
reported as a possible conflict and forwarded to the user."""
601
k.add_lines([], [b'aaa', b'bbb'])
602
k.add_lines([0], [b'aaa', b'111', b'bbb'])
603
k.add_lines([1], [b'aaa', b'222', b'bbb'])
607
self.assertEqual([[[b'aaa']],
608
[[b'111'], [b'222']],
612
class NonConflict(TestBase):
613
"""Two descendants insert compatible changes.
615
No conflict should be reported."""
621
k.add_lines([], [b'aaa', b'bbb'])
622
k.add_lines([0], [b'111', b'aaa', b'ccc', b'bbb'])
623
k.add_lines([1], [b'aaa', b'ccc', b'bbb', b'222'])
626
class Khayyam(TestBase):
627
"""Test changes to multi-line texts, and read/write"""
629
def test_multi_line_merge(self):
631
b"""A Book of Verses underneath the Bough,
632
A Jug of Wine, a Loaf of Bread, -- and Thou
633
Beside me singing in the Wilderness --
634
Oh, Wilderness were Paradise enow!""",
636
b"""A Book of Verses underneath the Bough,
637
A Jug of Wine, a Loaf of Bread, -- and Thou
638
Beside me singing in the Wilderness --
639
Oh, Wilderness were Paradise now!""",
641
b"""A Book of poems underneath the tree,
642
A Jug of Wine, a Loaf of Bread,
644
Beside me singing in the Wilderness --
645
Oh, Wilderness were Paradise now!
649
b"""A Book of Verses underneath the Bough,
650
A Jug of Wine, a Loaf of Bread,
652
Beside me singing in the Wilderness --
653
Oh, Wilderness were Paradise now!""",
655
texts = [[l.strip() for l in t.split(b'\n')] for t in rawtexts]
661
k.add_lines(b'text%d' % i, list(parents), t)
662
parents.add(b'text%d' % i)
665
self.log("k._weave=" + pformat(k._weave))
667
for i, t in enumerate(texts):
668
self.assertEqual(k.get_lines(i), t)
670
self.check_read_write(k)
673
class JoinWeavesTests(TestBase):
676
super(JoinWeavesTests, self).setUp()
677
self.weave1 = Weave()
678
self.lines1 = [b'hello\n']
679
self.lines3 = [b'hello\n', b'cruel\n', b'world\n']
680
self.weave1.add_lines(b'v1', [], self.lines1)
681
self.weave1.add_lines(b'v2', [b'v1'], [b'hello\n', b'world\n'])
682
self.weave1.add_lines(b'v3', [b'v2'], self.lines3)
684
def test_written_detection(self):
685
# Test detection of weave file corruption.
687
# Make sure that we can detect if a weave file has
688
# been corrupted. This doesn't test all forms of corruption,
689
# but it at least helps verify the data you get, is what you want.
692
w.add_lines(b'v1', [], [b'hello\n'])
693
w.add_lines(b'v2', [b'v1'], [b'hello\n', b'there\n'])
698
# Because we are corrupting, we need to make sure we have the exact
701
b'# bzr weave file v5\n'
702
b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
703
b'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
704
b'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
707
# Change a single letter
709
b'# bzr weave file v5\n'
710
b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
711
b'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
712
b'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
716
self.assertEqual(b'hello\n', w.get_text(b'v1'))
717
self.assertRaises(WeaveInvalidChecksum, w.get_text, b'v2')
718
self.assertRaises(WeaveInvalidChecksum, w.get_lines, b'v2')
719
self.assertRaises(WeaveInvalidChecksum, w.check)
721
# Change the sha checksum
723
b'# bzr weave file v5\n'
724
b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
725
b'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
726
b'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
730
self.assertEqual(b'hello\n', w.get_text(b'v1'))
731
self.assertRaises(WeaveInvalidChecksum, w.get_text, b'v2')
732
self.assertRaises(WeaveInvalidChecksum, w.get_lines, b'v2')
733
self.assertRaises(WeaveInvalidChecksum, w.check)
736
class TestWeave(TestCase):
738
def test_allow_reserved_false(self):
739
w = Weave('name', allow_reserved=False)
740
# Add lines is checked at the WeaveFile level, not at the Weave level
741
w.add_lines(b'name:', [], TEXT_1)
742
# But get_lines is checked at this level
743
self.assertRaises(errors.ReservedId, w.get_lines, b'name:')
745
def test_allow_reserved_true(self):
746
w = Weave('name', allow_reserved=True)
747
w.add_lines(b'name:', [], TEXT_1)
748
self.assertEqual(TEXT_1, w.get_lines(b'name:'))
751
class InstrumentedWeave(Weave):
752
"""Keep track of how many times functions are called."""
754
def __init__(self, weave_name=None):
755
self._extract_count = 0
756
Weave.__init__(self, weave_name=weave_name)
758
def _extract(self, versions):
759
self._extract_count += 1
760
return Weave._extract(self, versions)
763
class TestNeedsReweave(TestCase):
764
"""Internal corner cases for when reweave is needed."""
766
def test_compatible_parents(self):
768
my_parents = {1, 2, 3}
770
self.assertTrue(w1._compatible_parents(my_parents, {3}))
772
self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
773
# same empty corner case
774
self.assertTrue(w1._compatible_parents(set(), set()))
775
# other cannot contain stuff my_parents does not
776
self.assertFalse(w1._compatible_parents(set(), {1}))
777
self.assertFalse(w1._compatible_parents(my_parents, {1, 2, 3, 4}))
778
self.assertFalse(w1._compatible_parents(my_parents, {4}))
781
class TestWeaveFile(TestCaseInTempDir):
783
def test_empty_file(self):
784
with open('empty.weave', 'wb+') as f:
785
self.assertRaises(WeaveFormatError, read_weave, f)