/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

Add versionedfile.fix_parents api for correcting data post hoc.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
#
 
3
# Authors:
 
4
#   Johan Rydberg <jrydberg@gnu.org>
 
5
#
 
6
# This program is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation; either version 2 of the License, or
 
9
# (at your option) any later version.
 
10
 
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
 
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, write to the Free Software
 
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
 
 
21
import bzrlib
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import (
 
24
                           RevisionNotPresent, 
 
25
                           RevisionAlreadyPresent,
 
26
                           WeaveParentMismatch
 
27
                           )
 
28
from bzrlib.knit import KnitVersionedFile, \
 
29
     KnitAnnotateFactory
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.trace import mutter
 
32
from bzrlib.transport import get_transport
 
33
from bzrlib.transport.memory import MemoryTransport
 
34
import bzrlib.versionedfile as versionedfile
 
35
from bzrlib.weave import WeaveFile
 
36
from bzrlib.weavefile import read_weave
 
37
 
 
38
 
 
39
class VersionedFileTestMixIn(object):
 
40
    """A mixin test class for testing VersionedFiles.
 
41
 
 
42
    This is not an adaptor-style test at this point because
 
43
    theres no dynamic substitution of versioned file implementations,
 
44
    they are strictly controlled by their owning repositories.
 
45
    """
 
46
 
 
47
    def test_add(self):
 
48
        f = self.get_file()
 
49
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
50
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
51
        def verify_file(f):
 
52
            versions = f.versions()
 
53
            self.assertTrue('r0' in versions)
 
54
            self.assertTrue('r1' in versions)
 
55
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
56
            self.assertEquals(f.get_text('r0'), 'a\nb\n')
 
57
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
58
            self.assertEqual(2, len(f))
 
59
            self.assertEqual(2, f.num_versions())
 
60
    
 
61
            self.assertRaises(RevisionNotPresent,
 
62
                f.add_lines, 'r2', ['foo'], [])
 
63
            self.assertRaises(RevisionAlreadyPresent,
 
64
                f.add_lines, 'r1', [], [])
 
65
        verify_file(f)
 
66
        f = self.reopen_file()
 
67
        verify_file(f)
 
68
 
 
69
    def test_ancestry(self):
 
70
        f = self.get_file()
 
71
        self.assertEqual([], f.get_ancestry([]))
 
72
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
73
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
74
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
 
75
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
 
76
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
 
77
        self.assertEqual([], f.get_ancestry([]))
 
78
        versions = f.get_ancestry(['rM'])
 
79
        # there are some possibilities:
 
80
        # r0 r1 r2 rM r3
 
81
        # r0 r1 r2 r3 rM
 
82
        # etc
 
83
        # so we check indexes
 
84
        r0 = versions.index('r0')
 
85
        r1 = versions.index('r1')
 
86
        r2 = versions.index('r2')
 
87
        self.assertFalse('r3' in versions)
 
88
        rM = versions.index('rM')
 
89
        self.assertTrue(r0 < r1)
 
90
        self.assertTrue(r0 < r2)
 
91
        self.assertTrue(r1 < rM)
 
92
        self.assertTrue(r2 < rM)
 
93
 
 
94
        self.assertRaises(RevisionNotPresent,
 
95
            f.get_ancestry, ['rM', 'rX'])
 
96
        
 
97
    def test_clear_cache(self):
 
98
        f = self.get_file()
 
99
        # on a new file it should not error
 
100
        f.clear_cache()
 
101
        # and after adding content, doing a clear_cache and a get should work.
 
102
        f.add_lines('0', [], ['a'])
 
103
        f.clear_cache()
 
104
        self.assertEqual(['a'], f.get_lines('0'))
 
105
 
 
106
    def test_clone_text(self):
 
107
        f = self.get_file()
 
108
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
109
        f.clone_text('r1', 'r0', ['r0'])
 
110
        def verify_file(f):
 
111
            self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
 
112
            self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
 
113
            self.assertEquals(f.get_parents('r1'), ['r0'])
 
114
    
 
115
            self.assertRaises(RevisionNotPresent,
 
116
                f.clone_text, 'r2', 'rX', [])
 
117
            self.assertRaises(RevisionAlreadyPresent,
 
118
                f.clone_text, 'r1', 'r0', [])
 
119
        verify_file(f)
 
120
        verify_file(self.reopen_file())
 
121
 
 
122
    def test_create_empty(self):
 
123
        f = self.get_file()
 
124
        f.add_lines('0', [], ['a\n'])
 
125
        new_f = f.create_empty('t', MemoryTransport())
 
126
        # smoke test, specific types should check it is honoured correctly for
 
127
        # non type attributes
 
128
        self.assertEqual([], new_f.versions())
 
129
        self.assertTrue(isinstance(new_f, f.__class__))
 
130
 
 
131
    def test_copy_to(self):
 
132
        f = self.get_file()
 
133
        f.add_lines('0', [], ['a\n'])
 
134
        t = MemoryTransport()
 
135
        f.copy_to('foo', t)
 
136
        for suffix in f.__class__.get_suffixes():
 
137
            self.assertTrue(t.has('foo' + suffix))
 
138
 
 
139
    def test_get_suffixes(self):
 
140
        f = self.get_file()
 
141
        # should be the same
 
142
        self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
 
143
        # and should be a list
 
144
        self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
 
145
 
 
146
    def test_get_graph(self):
 
147
        f = self.get_file()
 
148
        f.add_lines('v1', [], ['hello\n'])
 
149
        f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
150
        f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
 
151
        self.assertEqual({'v1': [],
 
152
                          'v2': ['v1'],
 
153
                          'v3': ['v2']},
 
154
                         f.get_graph())
 
155
 
 
156
    def test_get_parents(self):
 
157
        f = self.get_file()
 
158
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
159
        f.add_lines('r1', [], ['a\n', 'b\n'])
 
160
        f.add_lines('r2', [], ['a\n', 'b\n'])
 
161
        f.add_lines('r3', [], ['a\n', 'b\n'])
 
162
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
 
163
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
 
164
 
 
165
        self.assertRaises(RevisionNotPresent,
 
166
            f.get_parents, 'y')
 
167
 
 
168
    def test_annotate(self):
 
169
        f = self.get_file()
 
170
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
171
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
172
        origins = f.annotate('r1')
 
173
        self.assertEquals(origins[0][0], 'r1')
 
174
        self.assertEquals(origins[1][0], 'r0')
 
175
 
 
176
        self.assertRaises(RevisionNotPresent,
 
177
            f.annotate, 'foo')
 
178
 
 
179
    def test_walk(self):
 
180
        # tests that walk returns all the inclusions for the requested
 
181
        # revisions as well as the revisions changes themselves.
 
182
        f = self.get_file('1')
 
183
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
184
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
185
        f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
 
186
        f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
 
187
 
 
188
        lines = {}
 
189
        for lineno, insert, dset, text in f.walk(['rX', 'rY']):
 
190
            lines[text] = (insert, dset)
 
191
 
 
192
        self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
 
193
        self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
 
194
        self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
 
195
        self.assertTrue(lines['d\n'], ('rX', set([])))
 
196
        self.assertTrue(lines['e\n'], ('rY', set([])))
 
197
 
 
198
    def test_detection(self):
 
199
        # Test weaves detect corruption.
 
200
        #
 
201
        # Weaves contain a checksum of their texts.
 
202
        # When a text is extracted, this checksum should be
 
203
        # verified.
 
204
 
 
205
        w = self.get_file_corrupted_text()
 
206
 
 
207
        self.assertEqual('hello\n', w.get_text('v1'))
 
208
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
209
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
210
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
211
 
 
212
        w = self.get_file_corrupted_checksum()
 
213
 
 
214
        self.assertEqual('hello\n', w.get_text('v1'))
 
215
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
216
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
217
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
218
 
 
219
    def get_file_corrupted_text(self):
 
220
        """Return a versioned file with corrupt text but valid metadata."""
 
221
        raise NotImplementedError(self.get_file_corrupted_text)
 
222
 
 
223
    def reopen_file(self, name='foo'):
 
224
        """Open the versioned file from disk again."""
 
225
        raise NotImplementedError(self.reopen_file)
 
226
 
 
227
    def test_iter_lines_added_or_present_in_versions(self):
 
228
        # test that we get at least an equalset of the lines added by
 
229
        # versions in the weave 
 
230
        # the ordering here is to make a tree so that dumb searches have
 
231
        # more changes to muck up.
 
232
        vf = self.get_file()
 
233
        # add a base to get included
 
234
        vf.add_lines('base', [], ['base\n'])
 
235
        # add a ancestor to be included on one side
 
236
        vf.add_lines('lancestor', [], ['lancestor\n'])
 
237
        # add a ancestor to be included on the other side
 
238
        vf.add_lines('rancestor', ['base'], ['rancestor\n'])
 
239
        # add a child of rancestor with no eofile-nl
 
240
        vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
 
241
        # add a child of lancestor and base to join the two roots
 
242
        vf.add_lines('otherchild',
 
243
                     ['lancestor', 'base'],
 
244
                     ['base\n', 'lancestor\n', 'otherchild\n'])
 
245
        def iter_with_versions(versions):
 
246
            # now we need to see what lines are returned, and how often.
 
247
            lines = {'base\n':0,
 
248
                     'lancestor\n':0,
 
249
                     'rancestor\n':0,
 
250
                     'child\n':0,
 
251
                     'otherchild\n':0,
 
252
                     }
 
253
            # iterate over the lines
 
254
            for line in vf.iter_lines_added_or_present_in_versions(versions):
 
255
                lines[line] += 1
 
256
            return lines
 
257
        lines = iter_with_versions(['child', 'otherchild'])
 
258
        # we must see child and otherchild
 
259
        self.assertTrue(lines['child\n'] > 0)
 
260
        self.assertTrue(lines['otherchild\n'] > 0)
 
261
        # we dont care if we got more than that.
 
262
        
 
263
        # test all lines
 
264
        lines = iter_with_versions(None)
 
265
        # all lines must be seen at least once
 
266
        self.assertTrue(lines['base\n'] > 0)
 
267
        self.assertTrue(lines['lancestor\n'] > 0)
 
268
        self.assertTrue(lines['rancestor\n'] > 0)
 
269
        self.assertTrue(lines['child\n'] > 0)
 
270
        self.assertTrue(lines['otherchild\n'] > 0)
 
271
 
 
272
    def test_fix_parents(self):
 
273
        # some versioned files allow incorrect parents to be corrected after
 
274
        # insertion - this may not fix ancestry..
 
275
        # if they do not supported, they just do not implement it.
 
276
        vf = self.get_file()
 
277
        vf.add_lines('notbase', [], [])
 
278
        vf.add_lines('base', [], [])
 
279
        try:
 
280
            vf.fix_parents('notbase', ['base'])
 
281
        except NotImplementedError:
 
282
            return
 
283
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
284
        # open again, check it stuck.
 
285
        vf = self.get_file()
 
286
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
287
 
 
288
 
 
289
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
 
290
 
 
291
    def get_file(self, name='foo'):
 
292
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
 
293
 
 
294
    def get_file_corrupted_text(self):
 
295
        w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
296
        w.add_lines('v1', [], ['hello\n'])
 
297
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
298
        
 
299
        # We are going to invasively corrupt the text
 
300
        # Make sure the internals of weave are the same
 
301
        self.assertEqual([('{', 0)
 
302
                        , 'hello\n'
 
303
                        , ('}', None)
 
304
                        , ('{', 1)
 
305
                        , 'there\n'
 
306
                        , ('}', None)
 
307
                        ], w._weave)
 
308
        
 
309
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
310
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
311
                        ], w._sha1s)
 
312
        w.check()
 
313
        
 
314
        # Corrupted
 
315
        w._weave[4] = 'There\n'
 
316
        return w
 
317
 
 
318
    def get_file_corrupted_checksum(self):
 
319
        w = self.get_file_corrupted_text()
 
320
        # Corrected
 
321
        w._weave[4] = 'there\n'
 
322
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
323
        
 
324
        #Invalid checksum, first digit changed
 
325
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
326
        return w
 
327
 
 
328
    def reopen_file(self, name='foo'):
 
329
        return WeaveFile(name, get_transport(self.get_url('.')))
 
330
 
 
331
    def test_no_implicit_create(self):
 
332
        self.assertRaises(errors.NoSuchFile,
 
333
                          WeaveFile,
 
334
                          'foo',
 
335
                          get_transport(self.get_url('.')))
 
336
 
 
337
 
 
338
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
 
339
 
 
340
    def get_file(self, name='foo'):
 
341
        return KnitVersionedFile(name, get_transport(self.get_url('.')),
 
342
                                 delta=True, create=True)
 
343
 
 
344
    def get_file_corrupted_text(self):
 
345
        knit = self.get_file()
 
346
        knit.add_lines('v1', [], ['hello\n'])
 
347
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
348
        return knit
 
349
 
 
350
    def reopen_file(self, name='foo'):
 
351
        return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
 
352
 
 
353
    def test_detection(self):
 
354
        print "TODO for merging: create a corrupted knit."
 
355
        knit = self.get_file()
 
356
        knit.check()
 
357
 
 
358
    def test_no_implicit_create(self):
 
359
        self.assertRaises(errors.NoSuchFile,
 
360
                          KnitVersionedFile,
 
361
                          'foo',
 
362
                          get_transport(self.get_url('.')))
 
363
 
 
364
 
 
365
class InterString(versionedfile.InterVersionedFile):
 
366
    """An inter-versionedfile optimised code path for strings.
 
367
 
 
368
    This is for use during testing where we use strings as versionedfiles
 
369
    so that none of the default regsitered interversionedfile classes will
 
370
    match - which lets us test the match logic.
 
371
    """
 
372
 
 
373
    @staticmethod
 
374
    def is_compatible(source, target):
 
375
        """InterString is compatible with strings-as-versionedfiles."""
 
376
        return isinstance(source, str) and isinstance(target, str)
 
377
 
 
378
 
 
379
# TODO this and the InterRepository core logic should be consolidatable
 
380
# if we make the registry a separate class though we still need to 
 
381
# test the behaviour in the active registry to catch failure-to-handle-
 
382
# stange-objects
 
383
class TestInterVersionedFile(TestCaseWithTransport):
 
384
 
 
385
    def test_get_default_inter_versionedfile(self):
 
386
        # test that the InterVersionedFile.get(a, b) probes
 
387
        # for a class where is_compatible(a, b) returns
 
388
        # true and returns a default interversionedfile otherwise.
 
389
        # This also tests that the default registered optimised interversionedfile
 
390
        # classes do not barf inappropriately when a surprising versionedfile type
 
391
        # is handed to them.
 
392
        dummy_a = "VersionedFile 1."
 
393
        dummy_b = "VersionedFile 2."
 
394
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
395
 
 
396
    def assertGetsDefaultInterVersionedFile(self, a, b):
 
397
        """Asserts that InterVersionedFile.get(a, b) -> the default."""
 
398
        inter = versionedfile.InterVersionedFile.get(a, b)
 
399
        self.assertEqual(versionedfile.InterVersionedFile,
 
400
                         inter.__class__)
 
401
        self.assertEqual(a, inter.source)
 
402
        self.assertEqual(b, inter.target)
 
403
 
 
404
    def test_register_inter_versionedfile_class(self):
 
405
        # test that a optimised code path provider - a
 
406
        # InterVersionedFile subclass can be registered and unregistered
 
407
        # and that it is correctly selected when given a versionedfile
 
408
        # pair that it returns true on for the is_compatible static method
 
409
        # check
 
410
        dummy_a = "VersionedFile 1."
 
411
        dummy_b = "VersionedFile 2."
 
412
        versionedfile.InterVersionedFile.register_optimiser(InterString)
 
413
        try:
 
414
            # we should get the default for something InterString returns False
 
415
            # to
 
416
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
417
            self.assertGetsDefaultInterVersionedFile(dummy_a, None)
 
418
            # and we should get an InterString for a pair it 'likes'
 
419
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
420
            inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
 
421
            self.assertEqual(InterString, inter.__class__)
 
422
            self.assertEqual(dummy_a, inter.source)
 
423
            self.assertEqual(dummy_b, inter.target)
 
424
        finally:
 
425
            versionedfile.InterVersionedFile.unregister_optimiser(InterString)
 
426
        # now we should get the default InterVersionedFile object again.
 
427
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)