1
# Copyright (C) 2005 by Canonical Ltd
4
# Johan Rydberg <jrydberg@gnu.org>
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20
# Remaing to do is to figure out if get_graph should return a simple
21
# map, or a graph object of some kind.
25
import bzrlib.errors as errors
26
from bzrlib.errors import RevisionNotPresent, \
27
RevisionAlreadyPresent
28
from bzrlib.knit import KnitVersionedFile, \
30
from bzrlib.tests import TestCaseInTempDir
31
from bzrlib.trace import mutter
32
from bzrlib.transport.local import LocalTransport
33
from bzrlib.weave import Weave
36
class VersionedFileTestMixIn(object):
37
"""A mixin test class for testing VersionedFiles.
39
This is not an adaptor-style test at this point because
40
theres no dynamic substitution of versioned file implementations,
41
they are strictly controlled by their owning repositories.
46
f.add_lines('r0', [], ['a\n', 'b\n'])
47
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
48
versions = f.versions()
49
self.assertTrue('r0' in versions)
50
self.assertTrue('r1' in versions)
51
self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
52
self.assertEquals(f.get_text('r0'), 'a\nb\n')
53
self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
55
self.assertRaises(RevisionNotPresent,
56
f.add_lines, 'r2', ['foo'], [])
57
self.assertRaises(RevisionAlreadyPresent,
58
f.add_lines, 'r1', [], [])
60
def test_ancestry(self):
62
f.add_lines('r0', [], ['a\n', 'b\n'])
63
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
64
f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
65
f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
66
f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
67
versions = set(f.get_ancestry(['rM']))
68
self.assertEquals(versions, set(['rM', 'r2', 'r1', 'r0']))
70
self.assertRaises(RevisionNotPresent,
71
f.get_ancestry, ['rM', 'rX'])
73
def test_clone_text(self):
75
f.add_lines('r0', [], ['a\n', 'b\n'])
76
f.clone_text('r1', 'r0', ['r0'])
77
self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
78
self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
79
self.assertEquals(f.get_parents('r1'), ['r0'])
81
self.assertRaises(RevisionNotPresent,
82
f.clone_text, 'r2', 'rX', [])
83
self.assertRaises(RevisionAlreadyPresent,
84
f.clone_text, 'r1', 'r0', [])
86
def test_get_parents(self):
88
f.add_lines('r0', [], ['a\n', 'b\n'])
89
f.add_lines('r1', [], ['a\n', 'b\n'])
90
f.add_lines('r2', [], ['a\n', 'b\n'])
91
f.add_lines('r3', [], ['a\n', 'b\n'])
92
f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
93
self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
95
self.assertRaises(RevisionNotPresent,
98
def test_annotate(self):
100
f.add_lines('r0', [], ['a\n', 'b\n'])
101
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
102
origins = f.annotate('r1')
103
self.assertEquals(origins[0][0], 'r1')
104
self.assertEquals(origins[1][0], 'r0')
106
self.assertRaises(RevisionNotPresent,
110
f1 = self.get_file('1')
111
f1.add_lines('r0', [], ['a\n', 'b\n'])
112
f1.add_lines('r1', ['r0'], ['c\n', 'b\n'])
113
f2 = self.get_file('2')
115
self.assertTrue(f2.has_version('r0'))
116
self.assertTrue(f2.has_version('r1'))
118
self.assertRaises(RevisionNotPresent,
119
f2.join, f1, version_ids=['r3'])
122
#f3 = self.get_file('1')
123
#f3.add_lines('r0', ['a\n', 'b\n'], [])
124
#f3.add_lines('r1', ['c\n', 'b\n'], ['r0'])
125
#f4 = self.get_file('2')
127
#self.assertTrue(f4.has_version('r0'))
128
#self.assertFalse(f4.has_version('r1'))
131
f = self.get_file('1')
132
f.add_lines('r0', [], ['a\n', 'b\n'])
133
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
134
f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
135
f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
138
for lineno, insert, dset, text in f.walk(['rX', 'rY']):
139
lines[text] = (insert, dset)
141
self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
142
self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
143
self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
144
self.assertTrue(lines['d\n'], ('rX', set([])))
145
self.assertTrue(lines['e\n'], ('rY', set([])))
147
def test_detection(self):
148
# Test weaves detect corruption.
150
# Weaves contain a checksum of their texts.
151
# When a text is extracted, this checksum should be
154
w = self.get_file_corrupted_text()
156
self.assertEqual('hello\n', w.get_text('v1'))
157
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
158
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
159
self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
160
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
162
w = self.get_file_corrupted_checksum()
164
self.assertEqual('hello\n', w.get_text('v1'))
165
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
166
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
167
self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
168
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
171
def get_file_corrupted_text(self):
172
"""Return a versioned file with corrupt text but valid metadata."""
173
raise NotImplementedError(self.get_file_corrupted_text)
176
class TestWeave(TestCaseInTempDir, VersionedFileTestMixIn):
178
def get_file(self, name='foo'):
181
def get_file_corrupted_text(self):
183
w.add('v1', [], ['hello\n'])
184
w.add('v2', ['v1'], ['hello\n', 'there\n'])
186
# We are going to invasively corrupt the text
187
# Make sure the internals of weave are the same
188
self.assertEqual([('{', 0)
196
self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
197
, '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
202
w._weave[4] = 'There\n'
205
def get_file_corrupted_checksum(self):
206
w = self.get_file_corrupted_text()
208
w._weave[4] = 'there\n'
209
self.assertEqual('hello\nthere\n', w.get_text('v2'))
211
#Invalid checksum, first digit changed
212
w._sha1s[1] = 'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
216
class TestKnit(TestCaseInTempDir, VersionedFileTestMixIn):
218
def get_file(self, name='foo'):
219
return KnitVersionedFile(LocalTransport('.'),
220
name, 'w', KnitAnnotateFactory(), delta=True)
222
def get_file_corrupted_text(self):
223
knit = self.get_file()
224
knit.add_lines('v1', [], ['hello\n'])
225
knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])