1
# Copyright (C) 2005 by Canonical Ltd
4
# Johan Rydberg <jrydberg@gnu.org>
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
import bzrlib.errors as errors
23
from bzrlib.errors import (
25
RevisionAlreadyPresent,
28
from bzrlib.knit import KnitVersionedFile, \
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.trace import mutter
32
from bzrlib.transport import get_transport
33
from bzrlib.transport.memory import MemoryTransport
34
import bzrlib.versionedfile as versionedfile
35
from bzrlib.weave import WeaveFile
36
from bzrlib.weavefile import read_weave
39
class VersionedFileTestMixIn(object):
40
"""A mixin test class for testing VersionedFiles.
42
This is not an adaptor-style test at this point because
43
theres no dynamic substitution of versioned file implementations,
44
they are strictly controlled by their owning repositories.
49
f.add_lines('r0', [], ['a\n', 'b\n'])
50
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
52
versions = f.versions()
53
self.assertTrue('r0' in versions)
54
self.assertTrue('r1' in versions)
55
self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
56
self.assertEquals(f.get_text('r0'), 'a\nb\n')
57
self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
58
self.assertEqual(2, len(f))
59
self.assertEqual(2, f.num_versions())
61
self.assertRaises(RevisionNotPresent,
62
f.add_lines, 'r2', ['foo'], [])
63
self.assertRaises(RevisionAlreadyPresent,
64
f.add_lines, 'r1', [], [])
66
f = self.reopen_file()
69
def test_ancestry(self):
71
self.assertEqual([], f.get_ancestry([]))
72
f.add_lines('r0', [], ['a\n', 'b\n'])
73
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
74
f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
75
f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
76
f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
77
self.assertEqual([], f.get_ancestry([]))
78
versions = set(f.get_ancestry(['rM']))
79
self.assertEquals(versions, set(['rM', 'r2', 'r1', 'r0']))
81
self.assertRaises(RevisionNotPresent,
82
f.get_ancestry, ['rM', 'rX'])
84
def test_clear_cache(self):
86
# on a new file it should not error
88
# and after adding content, doing a clear_cache and a get should work.
89
f.add_lines('0', [], ['a'])
91
self.assertEqual(['a'], f.get_lines('0'))
93
def test_clone_text(self):
95
f.add_lines('r0', [], ['a\n', 'b\n'])
96
f.clone_text('r1', 'r0', ['r0'])
98
self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
99
self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
100
self.assertEquals(f.get_parents('r1'), ['r0'])
102
self.assertRaises(RevisionNotPresent,
103
f.clone_text, 'r2', 'rX', [])
104
self.assertRaises(RevisionAlreadyPresent,
105
f.clone_text, 'r1', 'r0', [])
107
verify_file(self.reopen_file())
109
def test_create_empty(self):
111
f.add_lines('0', [], ['a\n'])
112
new_f = f.create_empty('t', MemoryTransport())
113
# smoke test, specific types should check it is honoured correctly for
114
# non type attributes
115
self.assertEqual([], new_f.versions())
116
self.assertTrue(isinstance(new_f, f.__class__))
118
def test_copy_to(self):
120
f.add_lines('0', [], ['a\n'])
121
t = MemoryTransport()
123
for suffix in f.__class__.get_suffixes():
124
self.assertTrue(t.has('foo' + suffix))
126
def test_get_suffixes(self):
129
self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
130
# and should be a list
131
self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
133
def test_get_graph(self):
135
f.add_lines('v1', [], ['hello\n'])
136
f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
137
f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
138
self.assertEqual({'v1': [],
143
def test_get_parents(self):
145
f.add_lines('r0', [], ['a\n', 'b\n'])
146
f.add_lines('r1', [], ['a\n', 'b\n'])
147
f.add_lines('r2', [], ['a\n', 'b\n'])
148
f.add_lines('r3', [], ['a\n', 'b\n'])
149
f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
150
self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
152
self.assertRaises(RevisionNotPresent,
155
def test_annotate(self):
157
f.add_lines('r0', [], ['a\n', 'b\n'])
158
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
159
origins = f.annotate('r1')
160
self.assertEquals(origins[0][0], 'r1')
161
self.assertEquals(origins[1][0], 'r0')
163
self.assertRaises(RevisionNotPresent,
167
f = self.get_file('1')
168
f.add_lines('r0', [], ['a\n', 'b\n'])
169
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
170
f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
171
f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
174
for lineno, insert, dset, text in f.walk(['rX', 'rY']):
175
lines[text] = (insert, dset)
177
self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
178
self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
179
self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
180
self.assertTrue(lines['d\n'], ('rX', set([])))
181
self.assertTrue(lines['e\n'], ('rY', set([])))
183
def test_detection(self):
184
# Test weaves detect corruption.
186
# Weaves contain a checksum of their texts.
187
# When a text is extracted, this checksum should be
190
w = self.get_file_corrupted_text()
192
self.assertEqual('hello\n', w.get_text('v1'))
193
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
194
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
195
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
197
w = self.get_file_corrupted_checksum()
199
self.assertEqual('hello\n', w.get_text('v1'))
200
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
201
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
202
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
204
def get_file_corrupted_text(self):
205
"""Return a versioned file with corrupt text but valid metadata."""
206
raise NotImplementedError(self.get_file_corrupted_text)
208
def reopen_file(self, name='foo'):
209
"""Open the versioned file from disk again."""
210
raise NotImplementedError(self.reopen_file)
213
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
215
def get_file(self, name='foo'):
216
return WeaveFile(name, get_transport(self.get_url('.')), create=True)
218
def get_file_corrupted_text(self):
219
w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
220
w.add_lines('v1', [], ['hello\n'])
221
w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
223
# We are going to invasively corrupt the text
224
# Make sure the internals of weave are the same
225
self.assertEqual([('{', 0)
233
self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
234
, '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
239
w._weave[4] = 'There\n'
242
def get_file_corrupted_checksum(self):
243
w = self.get_file_corrupted_text()
245
w._weave[4] = 'there\n'
246
self.assertEqual('hello\nthere\n', w.get_text('v2'))
248
#Invalid checksum, first digit changed
249
w._sha1s[1] = 'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
252
def reopen_file(self, name='foo'):
253
return WeaveFile(name, get_transport(self.get_url('.')))
255
def test_no_implicit_create(self):
256
self.assertRaises(errors.NoSuchFile,
259
get_transport(self.get_url('.')))
262
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
264
def get_file(self, name='foo'):
265
return KnitVersionedFile(name, get_transport(self.get_url('.')),
266
delta=True, create=True)
268
def get_file_corrupted_text(self):
269
knit = self.get_file()
270
knit.add_lines('v1', [], ['hello\n'])
271
knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
274
def reopen_file(self, name='foo'):
275
return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
277
def test_detection(self):
278
print "TODO for merging: create a corrupted knit."
279
knit = self.get_file()
282
def test_no_implicit_create(self):
283
self.assertRaises(errors.NoSuchFile,
286
get_transport(self.get_url('.')))
289
class InterString(versionedfile.InterVersionedFile):
290
"""An inter-versionedfile optimised code path for strings.
292
This is for use during testing where we use strings as versionedfiles
293
so that none of the default regsitered interversionedfile classes will
294
match - which lets us test the match logic.
298
def is_compatible(source, target):
299
"""InterString is compatible with strings-as-versionedfiles."""
300
return isinstance(source, str) and isinstance(target, str)
303
# TODO this and the InterRepository core logic should be consolidatable
304
# if we make the registry a separate class though we still need to
305
# test the behaviour in the active registry to catch failure-to-handle-
307
class TestInterVersionedFile(TestCaseWithTransport):
309
def test_get_default_inter_versionedfile(self):
310
# test that the InterVersionedFile.get(a, b) probes
311
# for a class where is_compatible(a, b) returns
312
# true and returns a default interversionedfile otherwise.
313
# This also tests that the default registered optimised interversionedfile
314
# classes do not barf inappropriately when a surprising versionedfile type
316
dummy_a = "VersionedFile 1."
317
dummy_b = "VersionedFile 2."
318
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
320
def assertGetsDefaultInterVersionedFile(self, a, b):
321
"""Asserts that InterVersionedFile.get(a, b) -> the default."""
322
inter = versionedfile.InterVersionedFile.get(a, b)
323
self.assertEqual(versionedfile.InterVersionedFile,
325
self.assertEqual(a, inter.source)
326
self.assertEqual(b, inter.target)
328
def test_register_inter_versionedfile_class(self):
329
# test that a optimised code path provider - a
330
# InterVersionedFile subclass can be registered and unregistered
331
# and that it is correctly selected when given a versionedfile
332
# pair that it returns true on for the is_compatible static method
334
dummy_a = "VersionedFile 1."
335
dummy_b = "VersionedFile 2."
336
versionedfile.InterVersionedFile.register_optimiser(InterString)
338
# we should get the default for something InterString returns False
340
self.assertFalse(InterString.is_compatible(dummy_a, None))
341
self.assertGetsDefaultInterVersionedFile(dummy_a, None)
342
# and we should get an InterString for a pair it 'likes'
343
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
344
inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
345
self.assertEqual(InterString, inter.__class__)
346
self.assertEqual(dummy_a, inter.source)
347
self.assertEqual(dummy_b, inter.target)
349
versionedfile.InterVersionedFile.unregister_optimiser(InterString)
350
# now we should get the default InterVersionedFile object again.
351
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)