104
115
f = self.reopen_file()
107
def test_get_delta(self):
109
sha1s = self._setup_for_deltas(f)
110
expected_delta = (None, '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False,
111
[(0, 0, 1, [('base', 'line\n')])])
112
self.assertEqual(expected_delta, f.get_delta('base'))
114
text_name = 'chain1-'
115
for depth in range(26):
116
new_version = text_name + '%s' % depth
117
expected_delta = (next_parent, sha1s[depth],
119
[(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
120
self.assertEqual(expected_delta, f.get_delta(new_version))
121
next_parent = new_version
123
text_name = 'chain2-'
124
for depth in range(26):
125
new_version = text_name + '%s' % depth
126
expected_delta = (next_parent, sha1s[depth], False,
127
[(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
128
self.assertEqual(expected_delta, f.get_delta(new_version))
129
next_parent = new_version
130
# smoke test for eol support
131
expected_delta = ('base', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True, [])
132
self.assertEqual(['line'], f.get_lines('noeol'))
133
self.assertEqual(expected_delta, f.get_delta('noeol'))
135
def test_get_deltas(self):
137
sha1s = self._setup_for_deltas(f)
138
deltas = f.get_deltas(f.versions())
139
expected_delta = (None, '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False,
140
[(0, 0, 1, [('base', 'line\n')])])
141
self.assertEqual(expected_delta, deltas['base'])
143
text_name = 'chain1-'
144
for depth in range(26):
145
new_version = text_name + '%s' % depth
146
expected_delta = (next_parent, sha1s[depth],
148
[(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
149
self.assertEqual(expected_delta, deltas[new_version])
150
next_parent = new_version
152
text_name = 'chain2-'
153
for depth in range(26):
154
new_version = text_name + '%s' % depth
155
expected_delta = (next_parent, sha1s[depth], False,
156
[(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
157
self.assertEqual(expected_delta, deltas[new_version])
158
next_parent = new_version
159
# smoke tests for eol support
160
expected_delta = ('base', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True, [])
161
self.assertEqual(['line'], f.get_lines('noeol'))
162
self.assertEqual(expected_delta, deltas['noeol'])
163
# smoke tests for eol support - two noeol in a row same content
164
expected_deltas = (('noeol', '3ad7ee82dbd8f29ecba073f96e43e414b3f70a4d', True,
165
[(0, 1, 2, [(u'noeolsecond', 'line\n'), (u'noeolsecond', 'line\n')])]),
166
('noeol', '3ad7ee82dbd8f29ecba073f96e43e414b3f70a4d', True,
167
[(0, 0, 1, [('noeolsecond', 'line\n')]), (1, 1, 0, [])]))
168
self.assertEqual(['line\n', 'line'], f.get_lines('noeolsecond'))
169
self.assertTrue(deltas['noeolsecond'] in expected_deltas)
170
# two no-eol in a row, different content
171
expected_delta = ('noeolsecond', '8bb553a84e019ef1149db082d65f3133b195223b', True,
172
[(1, 2, 1, [(u'noeolnotshared', 'phone\n')])])
173
self.assertEqual(['line\n', 'phone'], f.get_lines('noeolnotshared'))
174
self.assertEqual(expected_delta, deltas['noeolnotshared'])
175
# eol folling a no-eol with content change
176
expected_delta = ('noeol', 'a61f6fb6cfc4596e8d88c34a308d1e724caf8977', False,
177
[(0, 1, 1, [(u'eol', 'phone\n')])])
178
self.assertEqual(['phone\n'], f.get_lines('eol'))
179
self.assertEqual(expected_delta, deltas['eol'])
180
# eol folling a no-eol with content change
181
expected_delta = ('noeol', '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False,
182
[(0, 1, 1, [(u'eolline', 'line\n')])])
183
self.assertEqual(['line\n'], f.get_lines('eolline'))
184
self.assertEqual(expected_delta, deltas['eolline'])
185
# eol with no parents
186
expected_delta = (None, '264f39cab871e4cfd65b3a002f7255888bb5ed97', True,
187
[(0, 0, 1, [(u'noeolbase', 'line\n')])])
188
self.assertEqual(['line'], f.get_lines('noeolbase'))
189
self.assertEqual(expected_delta, deltas['noeolbase'])
190
# eol with two parents, in inverse insertion order
191
expected_deltas = (('noeolbase', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True,
192
[(0, 1, 1, [(u'eolbeforefirstparent', 'line\n')])]),
193
('noeolbase', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True,
194
[(0, 1, 1, [(u'eolbeforefirstparent', 'line\n')])]))
195
self.assertEqual(['line'], f.get_lines('eolbeforefirstparent'))
196
#self.assertTrue(deltas['eolbeforefirstparent'] in expected_deltas)
118
def test_add_unicode_content(self):
119
# unicode content is not permitted in versioned files.
120
# versioned files version sequences of bytes only.
122
self.assertRaises(errors.BzrBadParameterUnicode,
123
vf.add_lines, 'a', [], ['a\n', u'b\n', 'c\n'])
125
(errors.BzrBadParameterUnicode, NotImplementedError),
126
vf.add_lines_with_ghosts, 'a', [], ['a\n', u'b\n', 'c\n'])
128
def test_add_follows_left_matching_blocks(self):
129
"""If we change left_matching_blocks, delta changes
131
Note: There are multiple correct deltas in this case, because
132
we start with 1 "a" and we get 3.
135
if isinstance(vf, WeaveFile):
136
raise TestSkipped("WeaveFile ignores left_matching_blocks")
137
vf.add_lines('1', [], ['a\n'])
138
vf.add_lines('2', ['1'], ['a\n', 'a\n', 'a\n'],
139
left_matching_blocks=[(0, 0, 1), (1, 3, 0)])
140
self.assertEqual(['a\n', 'a\n', 'a\n'], vf.get_lines('2'))
141
vf.add_lines('3', ['1'], ['a\n', 'a\n', 'a\n'],
142
left_matching_blocks=[(0, 2, 1), (1, 3, 0)])
143
self.assertEqual(['a\n', 'a\n', 'a\n'], vf.get_lines('3'))
145
def test_inline_newline_throws(self):
146
# \r characters are not permitted in lines being added
148
self.assertRaises(errors.BzrBadParameterContainsNewline,
149
vf.add_lines, 'a', [], ['a\n\n'])
151
(errors.BzrBadParameterContainsNewline, NotImplementedError),
152
vf.add_lines_with_ghosts, 'a', [], ['a\n\n'])
153
# but inline CR's are allowed
154
vf.add_lines('a', [], ['a\r\n'])
156
vf.add_lines_with_ghosts('b', [], ['a\r\n'])
157
except NotImplementedError:
160
def test_add_reserved(self):
162
self.assertRaises(errors.ReservedId,
163
vf.add_lines, 'a:', [], ['a\n', 'b\n', 'c\n'])
165
def test_add_lines_nostoresha(self):
166
"""When nostore_sha is supplied using old content raises."""
168
empty_text = ('a', [])
169
sample_text_nl = ('b', ["foo\n", "bar\n"])
170
sample_text_no_nl = ('c', ["foo\n", "bar"])
172
for version, lines in (empty_text, sample_text_nl, sample_text_no_nl):
173
sha, _, _ = vf.add_lines(version, [], lines)
175
# we now have a copy of all the lines in the vf.
176
for sha, (version, lines) in zip(
177
shas, (empty_text, sample_text_nl, sample_text_no_nl)):
178
self.assertRaises(errors.ExistingContent,
179
vf.add_lines, version + "2", [], lines,
181
# and no new version should have been added.
182
self.assertRaises(errors.RevisionNotPresent, vf.get_lines,
185
def test_add_lines_with_ghosts_nostoresha(self):
186
"""When nostore_sha is supplied using old content raises."""
188
empty_text = ('a', [])
189
sample_text_nl = ('b', ["foo\n", "bar\n"])
190
sample_text_no_nl = ('c', ["foo\n", "bar"])
192
for version, lines in (empty_text, sample_text_nl, sample_text_no_nl):
193
sha, _, _ = vf.add_lines(version, [], lines)
195
# we now have a copy of all the lines in the vf.
196
# is the test applicable to this vf implementation?
198
vf.add_lines_with_ghosts('d', [], [])
199
except NotImplementedError:
200
raise TestSkipped("add_lines_with_ghosts is optional")
201
for sha, (version, lines) in zip(
202
shas, (empty_text, sample_text_nl, sample_text_no_nl)):
203
self.assertRaises(errors.ExistingContent,
204
vf.add_lines_with_ghosts, version + "2", [], lines,
206
# and no new version should have been added.
207
self.assertRaises(errors.RevisionNotPresent, vf.get_lines,
210
def test_add_lines_return_value(self):
211
# add_lines should return the sha1 and the text size.
213
empty_text = ('a', [])
214
sample_text_nl = ('b', ["foo\n", "bar\n"])
215
sample_text_no_nl = ('c', ["foo\n", "bar"])
216
# check results for the three cases:
217
for version, lines in (empty_text, sample_text_nl, sample_text_no_nl):
218
# the first two elements are the same for all versioned files:
219
# - the digest and the size of the text. For some versioned files
220
# additional data is returned in additional tuple elements.
221
result = vf.add_lines(version, [], lines)
222
self.assertEqual(3, len(result))
223
self.assertEqual((osutils.sha_strings(lines), sum(map(len, lines))),
225
# parents should not affect the result:
226
lines = sample_text_nl[1]
227
self.assertEqual((osutils.sha_strings(lines), sum(map(len, lines))),
228
vf.add_lines('d', ['b', 'c'], lines)[0:2])
230
def test_get_reserved(self):
232
self.assertRaises(errors.ReservedId, vf.get_texts, ['b:'])
233
self.assertRaises(errors.ReservedId, vf.get_lines, 'b:')
234
self.assertRaises(errors.ReservedId, vf.get_text, 'b:')
236
def test_make_mpdiffs(self):
237
from bzrlib import multiparent
238
vf = self.get_file('foo')
239
sha1s = self._setup_for_deltas(vf)
240
new_vf = self.get_file('bar')
241
for version in multiparent.topo_iter(vf):
242
mpdiff = vf.make_mpdiffs([version])[0]
243
new_vf.add_mpdiffs([(version, vf.get_parents(version),
244
vf.get_sha1(version), mpdiff)])
245
self.assertEqualDiff(vf.get_text(version),
246
new_vf.get_text(version))
198
248
def _setup_for_deltas(self, f):
199
self.assertRaises(errors.RevisionNotPresent, f.get_delta, 'base')
249
self.assertFalse(f.has_version('base'))
200
250
# add texts that should trip the knit maximum delta chain threshold
201
251
# as well as doing parallel chains of data in knits.
202
252
# this is done by two chains of 25 insertions
490
558
vf.add_lines('otherchild',
491
559
['lancestor', 'base'],
492
560
['base\n', 'lancestor\n', 'otherchild\n'])
493
def iter_with_versions(versions):
561
def iter_with_versions(versions, expected):
494
562
# now we need to see what lines are returned, and how often.
564
progress = InstrumentedProgress()
501
565
# iterate over the lines
502
for line in vf.iter_lines_added_or_present_in_versions(versions):
566
for line in vf.iter_lines_added_or_present_in_versions(versions,
568
lines.setdefault(line, 0)
570
if []!= progress.updates:
571
self.assertEqual(expected, progress.updates)
505
lines = iter_with_versions(['child', 'otherchild'])
573
lines = iter_with_versions(['child', 'otherchild'],
574
[('Walking content.', 0, 2),
575
('Walking content.', 1, 2),
576
('Walking content.', 2, 2)])
506
577
# we must see child and otherchild
507
self.assertTrue(lines['child\n'] > 0)
508
self.assertTrue(lines['otherchild\n'] > 0)
578
self.assertTrue(lines[('child\n', 'child')] > 0)
579
self.assertTrue(lines[('otherchild\n', 'otherchild')] > 0)
509
580
# we dont care if we got more than that.
512
lines = iter_with_versions(None)
583
lines = iter_with_versions(None, [('Walking content.', 0, 5),
584
('Walking content.', 1, 5),
585
('Walking content.', 2, 5),
586
('Walking content.', 3, 5),
587
('Walking content.', 4, 5),
588
('Walking content.', 5, 5)])
513
589
# all lines must be seen at least once
514
self.assertTrue(lines['base\n'] > 0)
515
self.assertTrue(lines['lancestor\n'] > 0)
516
self.assertTrue(lines['rancestor\n'] > 0)
517
self.assertTrue(lines['child\n'] > 0)
518
self.assertTrue(lines['otherchild\n'] > 0)
520
def test_fix_parents(self):
521
# some versioned files allow incorrect parents to be corrected after
522
# insertion - this may not fix ancestry..
523
# if they do not supported, they just do not implement it.
524
# we test this as an interface test to ensure that those that *do*
525
# implementent it get it right.
527
vf.add_lines('notbase', [], [])
528
vf.add_lines('base', [], [])
530
vf.fix_parents('notbase', ['base'])
531
except NotImplementedError:
533
self.assertEqual(['base'], vf.get_parents('notbase'))
534
# open again, check it stuck.
536
self.assertEqual(['base'], vf.get_parents('notbase'))
538
def test_fix_parents_with_ghosts(self):
539
# when fixing parents, ghosts that are listed should not be ghosts
544
vf.add_lines_with_ghosts('notbase', ['base', 'stillghost'], [])
545
except NotImplementedError:
547
vf.add_lines('base', [], [])
548
vf.fix_parents('notbase', ['base', 'stillghost'])
549
self.assertEqual(['base'], vf.get_parents('notbase'))
550
# open again, check it stuck.
552
self.assertEqual(['base'], vf.get_parents('notbase'))
553
# and check the ghosts
554
self.assertEqual(['base', 'stillghost'],
555
vf.get_parents_with_ghosts('notbase'))
590
self.assertTrue(lines[('base\n', 'base')] > 0)
591
self.assertTrue(lines[('lancestor\n', 'lancestor')] > 0)
592
self.assertTrue(lines[('rancestor\n', 'rancestor')] > 0)
593
self.assertTrue(lines[('child\n', 'child')] > 0)
594
self.assertTrue(lines[('otherchild\n', 'otherchild')] > 0)
557
596
def test_add_lines_with_ghosts(self):
558
597
# some versioned file formats allow lines to be added with parent
570
612
self.assertRaises(NotImplementedError, vf.get_parents_with_ghosts, 'foo')
571
613
self.assertRaises(NotImplementedError, vf.get_graph_with_ghosts)
615
vf = self.reopen_file()
573
616
# test key graph related apis: getncestry, _graph, get_parents
575
618
# - these are ghost unaware and must not be reflect ghosts
576
self.assertEqual([u'notbxbfse'], vf.get_ancestry(u'notbxbfse'))
577
self.assertEqual([], vf.get_parents(u'notbxbfse'))
578
self.assertEqual({u'notbxbfse':[]}, vf.get_graph())
579
self.assertFalse(vf.has_version(u'b\xbfse'))
619
self.assertEqual(['notbxbfse'], vf.get_ancestry('notbxbfse'))
620
self.assertEqual([], vf.get_parents('notbxbfse'))
621
self.assertEqual({'notbxbfse':()}, vf.get_graph())
622
self.assertFalse(vf.has_version(parent_id_utf8))
580
623
# we have _with_ghost apis to give us ghost information.
581
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
582
self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
583
self.assertEqual({u'notbxbfse':[u'b\xbfse']}, vf.get_graph_with_ghosts())
584
self.assertTrue(vf.has_ghost(u'b\xbfse'))
624
self.assertEqual([parent_id_utf8, 'notbxbfse'], vf.get_ancestry_with_ghosts(['notbxbfse']))
625
self.assertEqual([parent_id_utf8], vf.get_parents_with_ghosts('notbxbfse'))
626
self.assertEqual({'notbxbfse':[parent_id_utf8]}, vf.get_graph_with_ghosts())
627
self.assertTrue(vf.has_ghost(parent_id_utf8))
585
628
# if we add something that is a ghost of another, it should correct the
586
629
# results of the prior apis
587
vf.add_lines(u'b\xbfse', [], [])
588
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry([u'notbxbfse']))
589
self.assertEqual([u'b\xbfse'], vf.get_parents(u'notbxbfse'))
590
self.assertEqual({u'b\xbfse':[],
591
u'notbxbfse':[u'b\xbfse'],
630
vf.add_lines(parent_id_utf8, [], [])
631
self.assertEqual([parent_id_utf8, 'notbxbfse'], vf.get_ancestry(['notbxbfse']))
632
self.assertEqual([parent_id_utf8], vf.get_parents('notbxbfse'))
633
self.assertEqual({parent_id_utf8:(),
634
'notbxbfse':(parent_id_utf8, ),
594
self.assertTrue(vf.has_version(u'b\xbfse'))
637
self.assertTrue(vf.has_version(parent_id_utf8))
595
638
# we have _with_ghost apis to give us ghost information.
596
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
597
self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
598
self.assertEqual({u'b\xbfse':[],
599
u'notbxbfse':[u'b\xbfse'],
639
self.assertEqual([parent_id_utf8, 'notbxbfse'],
640
vf.get_ancestry_with_ghosts(['notbxbfse']))
641
self.assertEqual([parent_id_utf8], vf.get_parents_with_ghosts('notbxbfse'))
642
self.assertEqual({parent_id_utf8:[],
643
'notbxbfse':[parent_id_utf8],
601
645
vf.get_graph_with_ghosts())
602
self.assertFalse(vf.has_ghost(u'b\xbfse'))
646
self.assertFalse(vf.has_ghost(parent_id_utf8))
604
648
def test_add_lines_with_ghosts_after_normal_revs(self):
605
649
# some versioned file formats allow lines to be added with parent
720
784
get_transport(self.get_url('.')))
787
class TestPlaintextKnit(TestKnit):
788
"""Test a knit with no cached annotations"""
790
def _factory(self, name, transport, file_mode=None, access_mode=None,
791
delta=True, create=False):
792
return KnitVersionedFile(name, transport, file_mode, access_mode,
793
KnitPlainFactory(), delta=delta,
796
def get_factory(self):
800
class TestPlanMergeVersionedFile(TestCaseWithMemoryTransport):
803
TestCaseWithMemoryTransport.setUp(self)
804
self.vf1 = KnitVersionedFile('root', self.get_transport(), create=True)
805
self.vf2 = KnitVersionedFile('root', self.get_transport(), create=True)
806
self.plan_merge_vf = versionedfile._PlanMergeVersionedFile('root',
807
[self.vf1, self.vf2])
809
def test_add_lines(self):
810
self.plan_merge_vf.add_lines('a:', [], [])
811
self.assertRaises(ValueError, self.plan_merge_vf.add_lines, 'a', [],
813
self.assertRaises(ValueError, self.plan_merge_vf.add_lines, 'a:', None,
815
self.assertRaises(ValueError, self.plan_merge_vf.add_lines, 'a:', [],
818
def test_ancestry(self):
819
self.vf1.add_lines('A', [], [])
820
self.vf1.add_lines('B', ['A'], [])
821
self.plan_merge_vf.add_lines('C:', ['B'], [])
822
self.plan_merge_vf.add_lines('D:', ['C:'], [])
823
self.assertEqual(set(['A', 'B', 'C:', 'D:']),
824
self.plan_merge_vf.get_ancestry('D:', topo_sorted=False))
826
def setup_abcde(self):
827
self.vf1.add_lines('A', [], ['a'])
828
self.vf1.add_lines('B', ['A'], ['b'])
829
self.vf2.add_lines('C', [], ['c'])
830
self.vf2.add_lines('D', ['C'], ['d'])
831
self.plan_merge_vf.add_lines('E:', ['B', 'D'], ['e'])
833
def test_ancestry_uses_all_versionedfiles(self):
835
self.assertEqual(set(['A', 'B', 'C', 'D', 'E:']),
836
self.plan_merge_vf.get_ancestry('E:', topo_sorted=False))
838
def test_ancestry_raises_revision_not_present(self):
839
error = self.assertRaises(errors.RevisionNotPresent,
840
self.plan_merge_vf.get_ancestry, 'E:', False)
841
self.assertContainsRe(str(error), '{E:} not present in "root"')
843
def test_get_parents(self):
845
self.assertEqual(['A'], self.plan_merge_vf.get_parents('B'))
846
self.assertEqual(['C'], self.plan_merge_vf.get_parents('D'))
847
self.assertEqual(['B', 'D'], self.plan_merge_vf.get_parents('E:'))
848
error = self.assertRaises(errors.RevisionNotPresent,
849
self.plan_merge_vf.get_parents, 'F')
850
self.assertContainsRe(str(error), '{F} not present in "root"')
852
def test_get_lines(self):
854
self.assertEqual(['a'], self.plan_merge_vf.get_lines('A'))
855
self.assertEqual(['c'], self.plan_merge_vf.get_lines('C'))
856
self.assertEqual(['e'], self.plan_merge_vf.get_lines('E:'))
857
error = self.assertRaises(errors.RevisionNotPresent,
858
self.plan_merge_vf.get_lines, 'F')
859
self.assertContainsRe(str(error), '{F} not present in "root"')
723
862
class InterString(versionedfile.InterVersionedFile):
724
863
"""An inter-versionedfile optimised code path for strings.
783
922
versionedfile.InterVersionedFile.unregister_optimiser(InterString)
784
923
# now we should get the default InterVersionedFile object again.
785
924
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
927
class TestReadonlyHttpMixin(object):
929
def test_readonly_http_works(self):
930
# we should be able to read from http with a versioned file.
932
# try an empty file access
933
readonly_vf = self.get_factory()('foo', get_transport(self.get_readonly_url('.')))
934
self.assertEqual([], readonly_vf.versions())
936
vf.add_lines('1', [], ['a\n'])
937
vf.add_lines('2', ['1'], ['b\n', 'a\n'])
938
readonly_vf = self.get_factory()('foo', get_transport(self.get_readonly_url('.')))
939
self.assertEqual(['1', '2'], vf.versions())
940
for version in readonly_vf.versions():
941
readonly_vf.get_lines(version)
944
class TestWeaveHTTP(TestCaseWithWebserver, TestReadonlyHttpMixin):
947
return WeaveFile('foo', get_transport(self.get_url('.')), create=True)
949
def get_factory(self):
953
class TestKnitHTTP(TestCaseWithWebserver, TestReadonlyHttpMixin):
956
return KnitVersionedFile('foo', get_transport(self.get_url('.')),
957
delta=True, create=True)
959
def get_factory(self):
960
return KnitVersionedFile
963
class MergeCasesMixin(object):
965
def doMerge(self, base, a, b, mp):
966
from cStringIO import StringIO
967
from textwrap import dedent
973
w.add_lines('text0', [], map(addcrlf, base))
974
w.add_lines('text1', ['text0'], map(addcrlf, a))
975
w.add_lines('text2', ['text0'], map(addcrlf, b))
979
self.log('merge plan:')
980
p = list(w.plan_merge('text1', 'text2'))
981
for state, line in p:
983
self.log('%12s | %s' % (state, line[:-1]))
987
mt.writelines(w.weave_merge(p))
989
self.log(mt.getvalue())
991
mp = map(addcrlf, mp)
992
self.assertEqual(mt.readlines(), mp)
995
def testOneInsert(self):
1001
def testSeparateInserts(self):
1002
self.doMerge(['aaa', 'bbb', 'ccc'],
1003
['aaa', 'xxx', 'bbb', 'ccc'],
1004
['aaa', 'bbb', 'yyy', 'ccc'],
1005
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
1007
def testSameInsert(self):
1008
self.doMerge(['aaa', 'bbb', 'ccc'],
1009
['aaa', 'xxx', 'bbb', 'ccc'],
1010
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
1011
['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
1012
overlappedInsertExpected = ['aaa', 'xxx', 'yyy', 'bbb']
1013
def testOverlappedInsert(self):
1014
self.doMerge(['aaa', 'bbb'],
1015
['aaa', 'xxx', 'yyy', 'bbb'],
1016
['aaa', 'xxx', 'bbb'], self.overlappedInsertExpected)
1018
# really it ought to reduce this to
1019
# ['aaa', 'xxx', 'yyy', 'bbb']
1022
def testClashReplace(self):
1023
self.doMerge(['aaa'],
1026
['<<<<<<< ', 'xxx', '=======', 'yyy', 'zzz',
1029
def testNonClashInsert1(self):
1030
self.doMerge(['aaa'],
1033
['<<<<<<< ', 'xxx', 'aaa', '=======', 'yyy', 'zzz',
1036
def testNonClashInsert2(self):
1037
self.doMerge(['aaa'],
1043
def testDeleteAndModify(self):
1044
"""Clashing delete and modification.
1046
If one side modifies a region and the other deletes it then
1047
there should be a conflict with one side blank.
1050
#######################################
1051
# skippd, not working yet
1054
self.doMerge(['aaa', 'bbb', 'ccc'],
1055
['aaa', 'ddd', 'ccc'],
1057
['<<<<<<<< ', 'aaa', '=======', '>>>>>>> ', 'ccc'])
1059
def _test_merge_from_strings(self, base, a, b, expected):
1061
w.add_lines('text0', [], base.splitlines(True))
1062
w.add_lines('text1', ['text0'], a.splitlines(True))
1063
w.add_lines('text2', ['text0'], b.splitlines(True))
1064
self.log('merge plan:')
1065
p = list(w.plan_merge('text1', 'text2'))
1066
for state, line in p:
1068
self.log('%12s | %s' % (state, line[:-1]))
1069
self.log('merge result:')
1070
result_text = ''.join(w.weave_merge(p))
1071
self.log(result_text)
1072
self.assertEqualDiff(result_text, expected)
1074
def test_weave_merge_conflicts(self):
1075
# does weave merge properly handle plans that end with unchanged?
1076
result = ''.join(self.get_file().weave_merge([('new-a', 'hello\n')]))
1077
self.assertEqual(result, 'hello\n')
1079
def test_deletion_extended(self):
1080
"""One side deletes, the other deletes more.
1097
self._test_merge_from_strings(base, a, b, result)
1099
def test_deletion_overlap(self):
1100
"""Delete overlapping regions with no other conflict.
1102
Arguably it'd be better to treat these as agreement, rather than
1103
conflict, but for now conflict is safer.
1131
self._test_merge_from_strings(base, a, b, result)
1133
def test_agreement_deletion(self):
1134
"""Agree to delete some lines, without conflicts."""
1156
self._test_merge_from_strings(base, a, b, result)
1158
def test_sync_on_deletion(self):
1159
"""Specific case of merge where we can synchronize incorrectly.
1161
A previous version of the weave merge concluded that the two versions
1162
agreed on deleting line 2, and this could be a synchronization point.
1163
Line 1 was then considered in isolation, and thought to be deleted on
1166
It's better to consider the whole thing as a disagreement region.
1177
a's replacement line 2
1190
a's replacement line 2
1197
self._test_merge_from_strings(base, a, b, result)
1200
class TestKnitMerge(TestCaseWithMemoryTransport, MergeCasesMixin):
1202
def get_file(self, name='foo'):
1203
return KnitVersionedFile(name, get_transport(self.get_url('.')),
1204
delta=True, create=True)
1206
def log_contents(self, w):
1210
class TestWeaveMerge(TestCaseWithMemoryTransport, MergeCasesMixin):
1212
def get_file(self, name='foo'):
1213
return WeaveFile(name, get_transport(self.get_url('.')), create=True)
1215
def log_contents(self, w):
1216
self.log('weave is:')
1218
write_weave(w, tmpf)
1219
self.log(tmpf.getvalue())
1221
overlappedInsertExpected = ['aaa', '<<<<<<< ', 'xxx', 'yyy', '=======',
1222
'xxx', '>>>>>>> ', 'bbb']
1225
class TestFormatSignatures(TestCaseWithMemoryTransport):
1227
def get_knit_file(self, name, annotated):
1229
factory = KnitAnnotateFactory()
1231
factory = KnitPlainFactory()
1232
return KnitVersionedFile(
1233
name, get_transport(self.get_url('.')), create=True,
1236
def test_knit_format_signatures(self):
1237
"""Different formats of knit have different signature strings."""
1238
knit = self.get_knit_file('a', True)
1239
self.assertEqual('knit-annotated', knit.get_format_signature())
1240
knit = self.get_knit_file('p', False)
1241
self.assertEqual('knit-plain', knit.get_format_signature())