1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
# TODO: tests regarding version names
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
22
"""test suite for weave algorithm"""
24
from pprint import pformat
29
from ..osutils import sha_string
30
from ..sixish import (
33
from . import TestCase, TestCaseInTempDir
34
from ..bzr.weave import Weave, WeaveFormatError, WeaveInvalidChecksum
35
from ..bzr.weavefile import write_weave, read_weave
38
# texts for use in testing
39
TEXT_0 = [b"Hello world"]
40
TEXT_1 = [b"Hello world",
44
class TestBase(TestCase):
46
def check_read_write(self, k):
47
"""Check the weave k can be written & re-read."""
48
from tempfile import TemporaryFile
57
self.log('serialized weave:')
61
self.log('parents: %s' % (k._parents == k2._parents))
62
self.log(' %r' % k._parents)
63
self.log(' %r' % k2._parents)
65
self.fail('read/write check failed')
68
class WeaveContains(TestBase):
69
"""Weave __contains__ operator"""
72
k = Weave(get_scope=lambda: None)
73
self.assertFalse(b'foo' in k)
74
k.add_lines(b'foo', [], TEXT_1)
75
self.assertTrue(b'foo' in k)
84
class AnnotateOne(TestBase):
88
k.add_lines(b'text0', [], TEXT_0)
89
self.assertEqual(k.annotate(b'text0'),
90
[(b'text0', TEXT_0[0])])
93
class InvalidAdd(TestBase):
94
"""Try to use invalid version number during add."""
99
self.assertRaises(errors.RevisionNotPresent,
106
class RepeatedAdd(TestBase):
107
"""Add the same version twice; harmless."""
109
def test_duplicate_add(self):
111
idx = k.add_lines(b'text0', [], TEXT_0)
112
idx2 = k.add_lines(b'text0', [], TEXT_0)
113
self.assertEqual(idx, idx2)
116
class InvalidRepeatedAdd(TestBase):
120
k.add_lines(b'basis', [], TEXT_0)
121
k.add_lines(b'text0', [], TEXT_0)
122
self.assertRaises(errors.RevisionAlreadyPresent,
126
[b'not the same text'])
127
self.assertRaises(errors.RevisionAlreadyPresent,
130
[b'basis'], # not the right parents
134
class InsertLines(TestBase):
135
"""Store a revision that adds one line to the original.
137
Look at the annotations to make sure that the first line is matched
138
and not stored repeatedly."""
143
k.add_lines(b'text0', [], [b'line 1'])
144
k.add_lines(b'text1', [b'text0'], [b'line 1', b'line 2'])
146
self.assertEqual(k.annotate(b'text0'),
147
[(b'text0', b'line 1')])
149
self.assertEqual(k.get_lines(1),
153
self.assertEqual(k.annotate(b'text1'),
154
[(b'text0', b'line 1'),
155
(b'text1', b'line 2')])
157
k.add_lines(b'text2', [b'text0'], [b'line 1', b'diverged line'])
159
self.assertEqual(k.annotate(b'text2'),
160
[(b'text0', b'line 1'),
161
(b'text2', b'diverged line')])
163
text3 = [b'line 1', b'middle line', b'line 2']
164
k.add_lines(b'text3',
165
[b'text0', b'text1'],
168
# self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]),
171
self.log("k._weave=" + pformat(k._weave))
173
self.assertEqual(k.annotate(b'text3'),
174
[(b'text0', b'line 1'),
175
(b'text3', b'middle line'),
176
(b'text1', b'line 2')])
178
# now multiple insertions at different places
180
b'text4', [b'text0', b'text1', b'text3'],
181
[b'line 1', b'aaa', b'middle line', b'bbb', b'line 2', b'ccc'])
183
self.assertEqual(k.annotate(b'text4'),
184
[(b'text0', b'line 1'),
186
(b'text3', b'middle line'),
188
(b'text1', b'line 2'),
192
class DeleteLines(TestBase):
193
"""Deletion of lines from existing text.
195
Try various texts all based on a common ancestor."""
200
base_text = [b'one', b'two', b'three', b'four']
202
k.add_lines(b'text0', [], base_text)
204
texts = [[b'one', b'two', b'three'],
205
[b'two', b'three', b'four'],
207
[b'one', b'two', b'three', b'four'],
212
k.add_lines(b'text%d' % i, [b'text0'], t)
215
self.log('final weave:')
216
self.log('k._weave=' + pformat(k._weave))
218
for i in range(len(texts)):
219
self.assertEqual(k.get_lines(i + 1),
223
class SuicideDelete(TestBase):
224
"""Invalid weave which tries to add and delete simultaneously."""
231
k._weave = [(b'{', 0),
239
# Weave.get doesn't trap this anymore
242
self.assertRaises(WeaveFormatError,
247
class CannedDelete(TestBase):
248
"""Unpack canned weave with deleted lines."""
256
k._weave = [(b'{', 0),
259
b'line to be deleted',
265
sha_string(b'first lineline to be deletedlast line'),
266
sha_string(b'first linelast line')]
268
self.assertEqual(k.get_lines(0),
270
b'line to be deleted',
274
self.assertEqual(k.get_lines(1),
280
class CannedReplacement(TestBase):
281
"""Unpack canned weave with deleted lines."""
286
k._parents = [frozenset(),
289
k._weave = [(b'{', 0),
292
b'line to be deleted',
301
sha_string(b'first lineline to be deletedlast line'),
302
sha_string(b'first linereplacement linelast line')]
304
self.assertEqual(k.get_lines(0),
306
b'line to be deleted',
310
self.assertEqual(k.get_lines(1),
317
class BadWeave(TestBase):
318
"""Test that we trap an insert which should not occur."""
323
k._parents = [frozenset(),
325
k._weave = [b'bad line',
329
b' added in version 1',
339
# Weave.get doesn't trap this anymore
342
self.assertRaises(WeaveFormatError,
347
class BadInsert(TestBase):
348
"""Test that we trap an insert which should not occur."""
353
k._parents = [frozenset(),
356
frozenset([0, 1, 2]),
358
k._weave = [(b'{', 0),
361
b' added in version 1',
368
# this is not currently enforced by get
371
self.assertRaises(WeaveFormatError,
375
self.assertRaises(WeaveFormatError,
380
class InsertNested(TestBase):
381
"""Insertion with nested instructions."""
386
k._parents = [frozenset(),
389
frozenset([0, 1, 2]),
391
k._weave = [(b'{', 0),
394
b' added in version 1',
404
sha_string(b'foo {}'),
405
sha_string(b'foo { added in version 1 also from v1}'),
406
sha_string(b'foo { added in v2}'),
408
b'foo { added in version 1 added in v2 also from v1}')
411
self.assertEqual(k.get_lines(0),
415
self.assertEqual(k.get_lines(1),
417
b' added in version 1',
421
self.assertEqual(k.get_lines(2),
426
self.assertEqual(k.get_lines(3),
428
b' added in version 1',
434
class DeleteLines2(TestBase):
435
"""Test recording revisions that delete lines.
437
This relies on the weave having a way to represent lines knocked
438
out by a later revision."""
443
k.add_lines(b'text0', [], [b"line the first",
448
self.assertEqual(len(k.get_lines(0)), 4)
450
k.add_lines(b'text1', [b'text0'], [b"line the first",
453
self.assertEqual(k.get_lines(1),
457
self.assertEqual(k.annotate(b'text1'),
458
[(b'text0', b"line the first"),
459
(b'text0', b"fine")])
462
class IncludeVersions(TestBase):
463
"""Check texts that are stored across multiple revisions.
465
Here we manually create a weave with particular encoding and make
466
sure it unpacks properly.
468
Text 0 includes nothing; text 1 includes text 0 and adds some
475
k._parents = [frozenset(), frozenset([0])]
476
k._weave = [(b'{', 0),
483
k._sha1s = [sha_string(b'first line'), sha_string(
484
b'first linesecond line')]
486
self.assertEqual(k.get_lines(1),
490
self.assertEqual(k.get_lines(0),
494
class DivergedIncludes(TestBase):
495
"""Weave with two diverged texts based on version 0.
499
# FIXME make the weave, dont poke at it.
502
k._names = [b'0', b'1', b'2']
503
k._name_map = {b'0': 0, b'1': 1, b'2': 2}
504
k._parents = [frozenset(),
508
k._weave = [(b'{', 0),
515
b"alternative second line",
520
sha_string(b'first line'),
521
sha_string(b'first linesecond line'),
522
sha_string(b'first linealternative second line')]
524
self.assertEqual(k.get_lines(0),
527
self.assertEqual(k.get_lines(1),
531
self.assertEqual(k.get_lines(b'2'),
533
b"alternative second line"])
535
self.assertEqual(list(k.get_ancestry([b'2'])),
539
class ReplaceLine(TestBase):
543
text0 = [b'cheddar', b'stilton', b'gruyere']
544
text1 = [b'cheddar', b'blue vein', b'neufchatel', b'chevre']
546
k.add_lines(b'text0', [], text0)
547
k.add_lines(b'text1', [b'text0'], text1)
549
self.log('k._weave=' + pformat(k._weave))
551
self.assertEqual(k.get_lines(0), text0)
552
self.assertEqual(k.get_lines(1), text1)
555
class Merge(TestBase):
556
"""Storage of versions that merge diverged parents"""
563
[b'header', b'', b'line from 1'],
564
[b'header', b'', b'line from 2', b'more from 2'],
565
[b'header', b'', b'line from 1', b'fixup line', b'line from 2'],
568
k.add_lines(b'text0', [], texts[0])
569
k.add_lines(b'text1', [b'text0'], texts[1])
570
k.add_lines(b'text2', [b'text0'], texts[2])
571
k.add_lines(b'merge', [b'text0', b'text1', b'text2'], texts[3])
573
for i, t in enumerate(texts):
574
self.assertEqual(k.get_lines(i), t)
576
self.assertEqual(k.annotate(b'merge'),
577
[(b'text0', b'header'),
579
(b'text1', b'line from 1'),
580
(b'merge', b'fixup line'),
581
(b'text2', b'line from 2'),
584
self.assertEqual(list(k.get_ancestry([b'merge'])),
585
[b'text0', b'text1', b'text2', b'merge'])
587
self.log('k._weave=' + pformat(k._weave))
589
self.check_read_write(k)
592
class Conflicts(TestBase):
593
"""Test detection of conflicting regions during a merge.
595
A base version is inserted, then two descendents try to
596
insert different lines in the same place. These should be
597
reported as a possible conflict and forwarded to the user."""
603
k.add_lines([], [b'aaa', b'bbb'])
604
k.add_lines([0], [b'aaa', b'111', b'bbb'])
605
k.add_lines([1], [b'aaa', b'222', b'bbb'])
609
self.assertEqual([[[b'aaa']],
610
[[b'111'], [b'222']],
614
class NonConflict(TestBase):
615
"""Two descendants insert compatible changes.
617
No conflict should be reported."""
623
k.add_lines([], [b'aaa', b'bbb'])
624
k.add_lines([0], [b'111', b'aaa', b'ccc', b'bbb'])
625
k.add_lines([1], [b'aaa', b'ccc', b'bbb', b'222'])
628
class Khayyam(TestBase):
629
"""Test changes to multi-line texts, and read/write"""
631
def test_multi_line_merge(self):
633
b"""A Book of Verses underneath the Bough,
634
A Jug of Wine, a Loaf of Bread, -- and Thou
635
Beside me singing in the Wilderness --
636
Oh, Wilderness were Paradise enow!""",
638
b"""A Book of Verses underneath the Bough,
639
A Jug of Wine, a Loaf of Bread, -- and Thou
640
Beside me singing in the Wilderness --
641
Oh, Wilderness were Paradise now!""",
643
b"""A Book of poems underneath the tree,
644
A Jug of Wine, a Loaf of Bread,
646
Beside me singing in the Wilderness --
647
Oh, Wilderness were Paradise now!
651
b"""A Book of Verses underneath the Bough,
652
A Jug of Wine, a Loaf of Bread,
654
Beside me singing in the Wilderness --
655
Oh, Wilderness were Paradise now!""",
657
texts = [[l.strip() for l in t.split(b'\n')] for t in rawtexts]
663
k.add_lines(b'text%d' % i, list(parents), t)
664
parents.add(b'text%d' % i)
667
self.log("k._weave=" + pformat(k._weave))
669
for i, t in enumerate(texts):
670
self.assertEqual(k.get_lines(i), t)
672
self.check_read_write(k)
675
class JoinWeavesTests(TestBase):
678
super(JoinWeavesTests, self).setUp()
679
self.weave1 = Weave()
680
self.lines1 = [b'hello\n']
681
self.lines3 = [b'hello\n', b'cruel\n', b'world\n']
682
self.weave1.add_lines(b'v1', [], self.lines1)
683
self.weave1.add_lines(b'v2', [b'v1'], [b'hello\n', b'world\n'])
684
self.weave1.add_lines(b'v3', [b'v2'], self.lines3)
686
def test_written_detection(self):
687
# Test detection of weave file corruption.
689
# Make sure that we can detect if a weave file has
690
# been corrupted. This doesn't test all forms of corruption,
691
# but it at least helps verify the data you get, is what you want.
694
w.add_lines(b'v1', [], [b'hello\n'])
695
w.add_lines(b'v2', [b'v1'], [b'hello\n', b'there\n'])
700
# Because we are corrupting, we need to make sure we have the exact
703
b'# bzr weave file v5\n'
704
b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
705
b'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
706
b'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
709
# Change a single letter
711
b'# bzr weave file v5\n'
712
b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
713
b'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
714
b'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
718
self.assertEqual(b'hello\n', w.get_text(b'v1'))
719
self.assertRaises(WeaveInvalidChecksum, w.get_text, b'v2')
720
self.assertRaises(WeaveInvalidChecksum, w.get_lines, b'v2')
721
self.assertRaises(WeaveInvalidChecksum, w.check)
723
# Change the sha checksum
725
b'# bzr weave file v5\n'
726
b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
727
b'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
728
b'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
732
self.assertEqual(b'hello\n', w.get_text(b'v1'))
733
self.assertRaises(WeaveInvalidChecksum, w.get_text, b'v2')
734
self.assertRaises(WeaveInvalidChecksum, w.get_lines, b'v2')
735
self.assertRaises(WeaveInvalidChecksum, w.check)
738
class TestWeave(TestCase):
740
def test_allow_reserved_false(self):
741
w = Weave('name', allow_reserved=False)
742
# Add lines is checked at the WeaveFile level, not at the Weave level
743
w.add_lines(b'name:', [], TEXT_1)
744
# But get_lines is checked at this level
745
self.assertRaises(errors.ReservedId, w.get_lines, b'name:')
747
def test_allow_reserved_true(self):
748
w = Weave('name', allow_reserved=True)
749
w.add_lines(b'name:', [], TEXT_1)
750
self.assertEqual(TEXT_1, w.get_lines(b'name:'))
753
class InstrumentedWeave(Weave):
754
"""Keep track of how many times functions are called."""
756
def __init__(self, weave_name=None):
757
self._extract_count = 0
758
Weave.__init__(self, weave_name=weave_name)
760
def _extract(self, versions):
761
self._extract_count += 1
762
return Weave._extract(self, versions)
765
class TestNeedsReweave(TestCase):
766
"""Internal corner cases for when reweave is needed."""
768
def test_compatible_parents(self):
770
my_parents = {1, 2, 3}
772
self.assertTrue(w1._compatible_parents(my_parents, {3}))
774
self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
775
# same empty corner case
776
self.assertTrue(w1._compatible_parents(set(), set()))
777
# other cannot contain stuff my_parents does not
778
self.assertFalse(w1._compatible_parents(set(), {1}))
779
self.assertFalse(w1._compatible_parents(my_parents, {1, 2, 3, 4}))
780
self.assertFalse(w1._compatible_parents(my_parents, {4}))
783
class TestWeaveFile(TestCaseInTempDir):
785
def test_empty_file(self):
786
with open('empty.weave', 'wb+') as f:
787
self.assertRaises(WeaveFormatError, read_weave, f)