1
# Copyright (C) 2005 by Canonical Ltd
4
# Johan Rydberg <jrydberg@gnu.org>
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
# Remaing to do is to figure out if get_graph should return a simple
21
# map, or a graph object of some kind.
25
import bzrlib.errors as errors
26
from bzrlib.errors import RevisionNotPresent, \
27
RevisionAlreadyPresent
28
from bzrlib.knit import KnitVersionedFile, \
30
from bzrlib.tests import TestCaseInTempDir
31
from bzrlib.trace import mutter
32
from bzrlib.transport.local import LocalTransport
33
from bzrlib.weave import Weave
36
class VersionedFileTestMixIn(object):
37
"""A mixin test class for testing VersionedFiles.
39
This is not an adaptor-style test at this point because
40
theres no dynamic substitution of versioned file implementations,
41
they are strictly controlled by their owning repositories.
46
f.add_lines('r0', [], ['a\n', 'b\n'])
47
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
48
versions = f.versions()
49
self.assertTrue('r0' in versions)
50
self.assertTrue('r1' in versions)
51
self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
52
self.assertEquals(f.get_text('r0'), 'a\nb\n')
53
self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
55
self.assertRaises(RevisionNotPresent,
56
f.add_lines, 'r2', ['foo'], [])
57
self.assertRaises(RevisionAlreadyPresent,
58
f.add_lines, 'r1', [], [])
60
def test_ancestry(self):
62
f.add_lines('r0', [], ['a\n', 'b\n'])
63
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
64
f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
65
f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
66
f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
67
versions = set(f.get_ancestry(['rM']))
68
self.assertEquals(versions, set(['rM', 'r2', 'r1', 'r0']))
70
self.assertRaises(RevisionNotPresent,
71
f.get_ancestry, ['rM', 'rX'])
73
def test_clear_cache(self):
75
# on a new file it should not error
77
# and after adding content, doing a clear_cache and a get should work.
78
f.add_lines('0', [], ['a'])
80
self.assertEqual(['a'], f.get_lines('0'))
82
def test_clone_text(self):
84
f.add_lines('r0', [], ['a\n', 'b\n'])
85
f.clone_text('r1', 'r0', ['r0'])
86
self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
87
self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
88
self.assertEquals(f.get_parents('r1'), ['r0'])
90
self.assertRaises(RevisionNotPresent,
91
f.clone_text, 'r2', 'rX', [])
92
self.assertRaises(RevisionAlreadyPresent,
93
f.clone_text, 'r1', 'r0', [])
95
def test_get_parents(self):
97
f.add_lines('r0', [], ['a\n', 'b\n'])
98
f.add_lines('r1', [], ['a\n', 'b\n'])
99
f.add_lines('r2', [], ['a\n', 'b\n'])
100
f.add_lines('r3', [], ['a\n', 'b\n'])
101
f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
102
self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
104
self.assertRaises(RevisionNotPresent,
107
def test_annotate(self):
109
f.add_lines('r0', [], ['a\n', 'b\n'])
110
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
111
origins = f.annotate('r1')
112
self.assertEquals(origins[0][0], 'r1')
113
self.assertEquals(origins[1][0], 'r0')
115
self.assertRaises(RevisionNotPresent,
119
f1 = self.get_file('1')
120
f1.add_lines('r0', [], ['a\n', 'b\n'])
121
f1.add_lines('r1', ['r0'], ['c\n', 'b\n'])
122
f2 = self.get_file('2')
124
self.assertTrue(f2.has_version('r0'))
125
self.assertTrue(f2.has_version('r1'))
127
self.assertRaises(RevisionNotPresent,
128
f2.join, f1, version_ids=['r3'])
131
#f3 = self.get_file('1')
132
#f3.add_lines('r0', ['a\n', 'b\n'], [])
133
#f3.add_lines('r1', ['c\n', 'b\n'], ['r0'])
134
#f4 = self.get_file('2')
136
#self.assertTrue(f4.has_version('r0'))
137
#self.assertFalse(f4.has_version('r1'))
140
f = self.get_file('1')
141
f.add_lines('r0', [], ['a\n', 'b\n'])
142
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
143
f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
144
f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
147
for lineno, insert, dset, text in f.walk(['rX', 'rY']):
148
lines[text] = (insert, dset)
150
self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
151
self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
152
self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
153
self.assertTrue(lines['d\n'], ('rX', set([])))
154
self.assertTrue(lines['e\n'], ('rY', set([])))
156
def test_detection(self):
157
# Test weaves detect corruption.
159
# Weaves contain a checksum of their texts.
160
# When a text is extracted, this checksum should be
163
w = self.get_file_corrupted_text()
165
self.assertEqual('hello\n', w.get_text('v1'))
166
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
167
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
168
self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
169
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
171
w = self.get_file_corrupted_checksum()
173
self.assertEqual('hello\n', w.get_text('v1'))
174
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
175
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
176
self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
177
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
180
def get_file_corrupted_text(self):
181
"""Return a versioned file with corrupt text but valid metadata."""
182
raise NotImplementedError(self.get_file_corrupted_text)
185
class TestWeave(TestCaseInTempDir, VersionedFileTestMixIn):
187
def get_file(self, name='foo'):
190
def get_file_corrupted_text(self):
192
w.add('v1', [], ['hello\n'])
193
w.add('v2', ['v1'], ['hello\n', 'there\n'])
195
# We are going to invasively corrupt the text
196
# Make sure the internals of weave are the same
197
self.assertEqual([('{', 0)
205
self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
206
, '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
211
w._weave[4] = 'There\n'
214
def get_file_corrupted_checksum(self):
215
w = self.get_file_corrupted_text()
217
w._weave[4] = 'there\n'
218
self.assertEqual('hello\nthere\n', w.get_text('v2'))
220
#Invalid checksum, first digit changed
221
w._sha1s[1] = 'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
225
class TestKnit(TestCaseInTempDir, VersionedFileTestMixIn):
227
def get_file(self, name='foo'):
228
return KnitVersionedFile(LocalTransport('.'),
229
name, 'w', KnitAnnotateFactory(), delta=True)
231
def get_file_corrupted_text(self):
232
knit = self.get_file()
233
knit.add_lines('v1', [], ['hello\n'])
234
knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])