1
# Copyright (C) 2005 by Canonical Ltd
4
# Johan Rydberg <jrydberg@gnu.org>
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
import bzrlib.errors as errors
23
from bzrlib.errors import (
25
RevisionAlreadyPresent,
28
from bzrlib.knit import KnitVersionedFile, \
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.trace import mutter
32
from bzrlib.transport import get_transport
33
from bzrlib.transport.memory import MemoryTransport
34
import bzrlib.versionedfile as versionedfile
35
from bzrlib.weave import WeaveFile
36
from bzrlib.weavefile import read_weave
39
class VersionedFileTestMixIn(object):
40
"""A mixin test class for testing VersionedFiles.
42
This is not an adaptor-style test at this point because
43
theres no dynamic substitution of versioned file implementations,
44
they are strictly controlled by their owning repositories.
49
f.add_lines('r0', [], ['a\n', 'b\n'])
50
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
52
versions = f.versions()
53
self.assertTrue('r0' in versions)
54
self.assertTrue('r1' in versions)
55
self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
56
self.assertEquals(f.get_text('r0'), 'a\nb\n')
57
self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
58
self.assertEqual(2, len(f))
59
self.assertEqual(2, f.num_versions())
61
self.assertRaises(RevisionNotPresent,
62
f.add_lines, 'r2', ['foo'], [])
63
self.assertRaises(RevisionAlreadyPresent,
64
f.add_lines, 'r1', [], [])
66
f = self.reopen_file()
69
def test_ancestry(self):
71
self.assertEqual([], f.get_ancestry([]))
72
f.add_lines('r0', [], ['a\n', 'b\n'])
73
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
74
f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
75
f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
76
f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
77
self.assertEqual([], f.get_ancestry([]))
78
versions = f.get_ancestry(['rM'])
79
# there are some possibilities:
84
r0 = versions.index('r0')
85
r1 = versions.index('r1')
86
r2 = versions.index('r2')
87
self.assertFalse('r3' in versions)
88
rM = versions.index('rM')
89
self.assertTrue(r0 < r1)
90
self.assertTrue(r0 < r2)
91
self.assertTrue(r1 < rM)
92
self.assertTrue(r2 < rM)
94
self.assertRaises(RevisionNotPresent,
95
f.get_ancestry, ['rM', 'rX'])
97
def test_clear_cache(self):
99
# on a new file it should not error
101
# and after adding content, doing a clear_cache and a get should work.
102
f.add_lines('0', [], ['a'])
104
self.assertEqual(['a'], f.get_lines('0'))
106
def test_clone_text(self):
108
f.add_lines('r0', [], ['a\n', 'b\n'])
109
f.clone_text('r1', 'r0', ['r0'])
111
self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
112
self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
113
self.assertEquals(f.get_parents('r1'), ['r0'])
115
self.assertRaises(RevisionNotPresent,
116
f.clone_text, 'r2', 'rX', [])
117
self.assertRaises(RevisionAlreadyPresent,
118
f.clone_text, 'r1', 'r0', [])
120
verify_file(self.reopen_file())
122
def test_create_empty(self):
124
f.add_lines('0', [], ['a\n'])
125
new_f = f.create_empty('t', MemoryTransport())
126
# smoke test, specific types should check it is honoured correctly for
127
# non type attributes
128
self.assertEqual([], new_f.versions())
129
self.assertTrue(isinstance(new_f, f.__class__))
131
def test_copy_to(self):
133
f.add_lines('0', [], ['a\n'])
134
t = MemoryTransport()
136
for suffix in f.__class__.get_suffixes():
137
self.assertTrue(t.has('foo' + suffix))
139
def test_get_suffixes(self):
142
self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
143
# and should be a list
144
self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
146
def test_get_graph(self):
148
f.add_lines('v1', [], ['hello\n'])
149
f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
150
f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
151
self.assertEqual({'v1': [],
156
def test_get_parents(self):
158
f.add_lines('r0', [], ['a\n', 'b\n'])
159
f.add_lines('r1', [], ['a\n', 'b\n'])
160
f.add_lines('r2', [], ['a\n', 'b\n'])
161
f.add_lines('r3', [], ['a\n', 'b\n'])
162
f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
163
self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
165
self.assertRaises(RevisionNotPresent,
168
def test_annotate(self):
170
f.add_lines('r0', [], ['a\n', 'b\n'])
171
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
172
origins = f.annotate('r1')
173
self.assertEquals(origins[0][0], 'r1')
174
self.assertEquals(origins[1][0], 'r0')
176
self.assertRaises(RevisionNotPresent,
180
# tests that walk returns all the inclusions for the requested
181
# revisions as well as the revisions changes themselves.
182
f = self.get_file('1')
183
f.add_lines('r0', [], ['a\n', 'b\n'])
184
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
185
f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
186
f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
189
for lineno, insert, dset, text in f.walk(['rX', 'rY']):
190
lines[text] = (insert, dset)
192
self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
193
self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
194
self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
195
self.assertTrue(lines['d\n'], ('rX', set([])))
196
self.assertTrue(lines['e\n'], ('rY', set([])))
198
def test_detection(self):
199
# Test weaves detect corruption.
201
# Weaves contain a checksum of their texts.
202
# When a text is extracted, this checksum should be
205
w = self.get_file_corrupted_text()
207
self.assertEqual('hello\n', w.get_text('v1'))
208
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
209
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
210
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
212
w = self.get_file_corrupted_checksum()
214
self.assertEqual('hello\n', w.get_text('v1'))
215
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
216
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
217
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
219
def get_file_corrupted_text(self):
220
"""Return a versioned file with corrupt text but valid metadata."""
221
raise NotImplementedError(self.get_file_corrupted_text)
223
def reopen_file(self, name='foo'):
224
"""Open the versioned file from disk again."""
225
raise NotImplementedError(self.reopen_file)
227
def test_iter_lines_added_or_present_in_versions(self):
228
# test that we get at least an equalset of the lines added by
229
# versions in the weave
230
# the ordering here is to make a tree so that dumb searches have
231
# more changes to muck up.
233
# add a base to get included
234
vf.add_lines('base', [], ['base\n'])
235
# add a ancestor to be included on one side
236
vf.add_lines('lancestor', [], ['lancestor\n'])
237
# add a ancestor to be included on the other side
238
vf.add_lines('rancestor', ['base'], ['rancestor\n'])
239
# add a child of rancestor with no eofile-nl
240
vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
241
# add a child of lancestor and base to join the two roots
242
vf.add_lines('otherchild',
243
['lancestor', 'base'],
244
['base\n', 'lancestor\n', 'otherchild\n'])
245
def iter_with_versions(versions):
246
# now we need to see what lines are returned, and how often.
253
# iterate over the lines
254
for line in vf.iter_lines_added_or_present_in_versions(versions):
257
lines = iter_with_versions(['child', 'otherchild'])
258
# we must see child and otherchild
259
self.assertTrue(lines['child\n'] > 0)
260
self.assertTrue(lines['otherchild\n'] > 0)
261
# we dont care if we got more than that.
264
lines = iter_with_versions(None)
265
# all lines must be seen at least once
266
self.assertTrue(lines['base\n'] > 0)
267
self.assertTrue(lines['lancestor\n'] > 0)
268
self.assertTrue(lines['rancestor\n'] > 0)
269
self.assertTrue(lines['child\n'] > 0)
270
self.assertTrue(lines['otherchild\n'] > 0)
272
def test_fix_parents(self):
273
# some versioned files allow incorrect parents to be corrected after
274
# insertion - this may not fix ancestry..
275
# if they do not supported, they just do not implement it.
276
# we test this as an interface test to ensure that those that *do*
277
# implementent it get it right.
279
vf.add_lines('notbase', [], [])
280
vf.add_lines('base', [], [])
282
vf.fix_parents('notbase', ['base'])
283
except NotImplementedError:
285
self.assertEqual(['base'], vf.get_parents('notbase'))
286
# open again, check it stuck.
288
self.assertEqual(['base'], vf.get_parents('notbase'))
290
def test_fix_parents_with_ghosts(self):
291
# when fixing parents, ghosts that are listed should not be ghosts
296
vf.add_lines_with_ghosts('notbase', ['base', 'stillghost'], [])
297
except NotImplementedError:
299
vf.add_lines('base', [], [])
300
vf.fix_parents('notbase', ['base', 'stillghost'])
301
self.assertEqual(['base'], vf.get_parents('notbase'))
302
# open again, check it stuck.
304
self.assertEqual(['base'], vf.get_parents('notbase'))
305
# and check the ghosts
306
self.assertEqual(['base', 'stillghost'],
307
vf.get_parents_with_ghosts('notbase'))
309
def test_add_lines_with_ghosts(self):
310
# some versioned file formats allow lines to be added with parent
311
# information that is > than that in the format. Formats that do
312
# not support this need to raise NotImplementedError on the
313
# add_lines_with_ghosts api.
315
# add a revision with ghost parents
317
vf.add_lines_with_ghosts('notbase', ['base'], [])
318
except NotImplementedError:
319
# check the other ghost apis are also not implemented
320
self.assertRaises(NotImplementedError, vf.has_ghost, 'foo')
321
self.assertRaises(NotImplementedError, vf.get_ancestry_with_ghosts, ['foo'])
322
self.assertRaises(NotImplementedError, vf.get_parents_with_ghosts, 'foo')
323
self.assertRaises(NotImplementedError, vf.get_graph_with_ghosts)
325
# test key graph related apis: getncestry, _graph, get_parents
327
# - these are ghost unaware and must not be reflect ghosts
328
self.assertEqual(['notbase'], vf.get_ancestry('notbase'))
329
self.assertEqual([], vf.get_parents('notbase'))
330
self.assertEqual({'notbase':[]}, vf.get_graph())
331
self.assertFalse(vf.has_version('base'))
332
# we have _with_ghost apis to give us ghost information.
333
self.assertEqual(['base', 'notbase'], vf.get_ancestry_with_ghosts(['notbase']))
334
self.assertEqual(['base'], vf.get_parents_with_ghosts('notbase'))
335
self.assertEqual({'notbase':['base']}, vf.get_graph_with_ghosts())
336
self.assertTrue(vf.has_ghost('base'))
337
# if we add something that is a ghost of another, it should correct the
338
# results of the prior apis
339
vf.add_lines('base', [], [])
340
self.assertEqual(['base', 'notbase'], vf.get_ancestry(['notbase']))
341
self.assertEqual(['base'], vf.get_parents('notbase'))
342
self.assertEqual({'base':[],
346
self.assertTrue(vf.has_version('base'))
347
# we have _with_ghost apis to give us ghost information.
348
self.assertEqual(['base', 'notbase'], vf.get_ancestry_with_ghosts(['notbase']))
349
self.assertEqual(['base'], vf.get_parents_with_ghosts('notbase'))
350
self.assertEqual({'base':[],
353
vf.get_graph_with_ghosts())
354
self.assertFalse(vf.has_ghost('base'))
357
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
359
def get_file(self, name='foo'):
360
return WeaveFile(name, get_transport(self.get_url('.')), create=True)
362
def get_file_corrupted_text(self):
363
w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
364
w.add_lines('v1', [], ['hello\n'])
365
w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
367
# We are going to invasively corrupt the text
368
# Make sure the internals of weave are the same
369
self.assertEqual([('{', 0)
377
self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
378
, '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
383
w._weave[4] = 'There\n'
386
def get_file_corrupted_checksum(self):
387
w = self.get_file_corrupted_text()
389
w._weave[4] = 'there\n'
390
self.assertEqual('hello\nthere\n', w.get_text('v2'))
392
#Invalid checksum, first digit changed
393
w._sha1s[1] = 'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
396
def reopen_file(self, name='foo'):
397
return WeaveFile(name, get_transport(self.get_url('.')))
399
def test_no_implicit_create(self):
400
self.assertRaises(errors.NoSuchFile,
403
get_transport(self.get_url('.')))
406
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
408
def get_file(self, name='foo'):
409
return KnitVersionedFile(name, get_transport(self.get_url('.')),
410
delta=True, create=True)
412
def get_file_corrupted_text(self):
413
knit = self.get_file()
414
knit.add_lines('v1', [], ['hello\n'])
415
knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
418
def reopen_file(self, name='foo'):
419
return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
421
def test_detection(self):
422
print "TODO for merging: create a corrupted knit."
423
knit = self.get_file()
426
def test_no_implicit_create(self):
427
self.assertRaises(errors.NoSuchFile,
430
get_transport(self.get_url('.')))
433
class InterString(versionedfile.InterVersionedFile):
434
"""An inter-versionedfile optimised code path for strings.
436
This is for use during testing where we use strings as versionedfiles
437
so that none of the default regsitered interversionedfile classes will
438
match - which lets us test the match logic.
442
def is_compatible(source, target):
443
"""InterString is compatible with strings-as-versionedfiles."""
444
return isinstance(source, str) and isinstance(target, str)
447
# TODO this and the InterRepository core logic should be consolidatable
448
# if we make the registry a separate class though we still need to
449
# test the behaviour in the active registry to catch failure-to-handle-
451
class TestInterVersionedFile(TestCaseWithTransport):
453
def test_get_default_inter_versionedfile(self):
454
# test that the InterVersionedFile.get(a, b) probes
455
# for a class where is_compatible(a, b) returns
456
# true and returns a default interversionedfile otherwise.
457
# This also tests that the default registered optimised interversionedfile
458
# classes do not barf inappropriately when a surprising versionedfile type
460
dummy_a = "VersionedFile 1."
461
dummy_b = "VersionedFile 2."
462
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
464
def assertGetsDefaultInterVersionedFile(self, a, b):
465
"""Asserts that InterVersionedFile.get(a, b) -> the default."""
466
inter = versionedfile.InterVersionedFile.get(a, b)
467
self.assertEqual(versionedfile.InterVersionedFile,
469
self.assertEqual(a, inter.source)
470
self.assertEqual(b, inter.target)
472
def test_register_inter_versionedfile_class(self):
473
# test that a optimised code path provider - a
474
# InterVersionedFile subclass can be registered and unregistered
475
# and that it is correctly selected when given a versionedfile
476
# pair that it returns true on for the is_compatible static method
478
dummy_a = "VersionedFile 1."
479
dummy_b = "VersionedFile 2."
480
versionedfile.InterVersionedFile.register_optimiser(InterString)
482
# we should get the default for something InterString returns False
484
self.assertFalse(InterString.is_compatible(dummy_a, None))
485
self.assertGetsDefaultInterVersionedFile(dummy_a, None)
486
# and we should get an InterString for a pair it 'likes'
487
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
488
inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
489
self.assertEqual(InterString, inter.__class__)
490
self.assertEqual(dummy_a, inter.source)
491
self.assertEqual(dummy_b, inter.target)
493
versionedfile.InterVersionedFile.unregister_optimiser(InterString)
494
# now we should get the default InterVersionedFile object again.
495
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)