/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

Introduce a api specifically for looking at lines in some versions of the inventory, for fileid_involved.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
#
 
3
# Authors:
 
4
#   Johan Rydberg <jrydberg@gnu.org>
 
5
#
 
6
# This program is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation; either version 2 of the License, or
 
9
# (at your option) any later version.
 
10
 
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
 
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, write to the Free Software
 
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
 
 
21
import bzrlib
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import (
 
24
                           RevisionNotPresent, 
 
25
                           RevisionAlreadyPresent,
 
26
                           WeaveParentMismatch
 
27
                           )
 
28
from bzrlib.knit import KnitVersionedFile, \
 
29
     KnitAnnotateFactory
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.trace import mutter
 
32
from bzrlib.transport import get_transport
 
33
from bzrlib.transport.memory import MemoryTransport
 
34
import bzrlib.versionedfile as versionedfile
 
35
from bzrlib.weave import WeaveFile
 
36
from bzrlib.weavefile import read_weave
 
37
 
 
38
 
 
39
class VersionedFileTestMixIn(object):
 
40
    """A mixin test class for testing VersionedFiles.
 
41
 
 
42
    This is not an adaptor-style test at this point because
 
43
    theres no dynamic substitution of versioned file implementations,
 
44
    they are strictly controlled by their owning repositories.
 
45
    """
 
46
 
 
47
    def test_add(self):
 
48
        f = self.get_file()
 
49
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
50
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
51
        def verify_file(f):
 
52
            versions = f.versions()
 
53
            self.assertTrue('r0' in versions)
 
54
            self.assertTrue('r1' in versions)
 
55
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
56
            self.assertEquals(f.get_text('r0'), 'a\nb\n')
 
57
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
58
            self.assertEqual(2, len(f))
 
59
            self.assertEqual(2, f.num_versions())
 
60
    
 
61
            self.assertRaises(RevisionNotPresent,
 
62
                f.add_lines, 'r2', ['foo'], [])
 
63
            self.assertRaises(RevisionAlreadyPresent,
 
64
                f.add_lines, 'r1', [], [])
 
65
        verify_file(f)
 
66
        f = self.reopen_file()
 
67
        verify_file(f)
 
68
 
 
69
    def test_ancestry(self):
 
70
        f = self.get_file()
 
71
        self.assertEqual([], f.get_ancestry([]))
 
72
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
73
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
74
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
 
75
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
 
76
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
 
77
        self.assertEqual([], f.get_ancestry([]))
 
78
        versions = f.get_ancestry(['rM'])
 
79
        # there are some possibilities:
 
80
        # r0 r1 r2 rM r3
 
81
        # r0 r1 r2 r3 rM
 
82
        # etc
 
83
        # so we check indexes
 
84
        r0 = versions.index('r0')
 
85
        r1 = versions.index('r1')
 
86
        r2 = versions.index('r2')
 
87
        self.assertFalse('r3' in versions)
 
88
        rM = versions.index('rM')
 
89
        self.assertTrue(r0 < r1)
 
90
        self.assertTrue(r0 < r2)
 
91
        self.assertTrue(r1 < rM)
 
92
        self.assertTrue(r2 < rM)
 
93
 
 
94
        self.assertRaises(RevisionNotPresent,
 
95
            f.get_ancestry, ['rM', 'rX'])
 
96
        
 
97
    def test_clear_cache(self):
 
98
        f = self.get_file()
 
99
        # on a new file it should not error
 
100
        f.clear_cache()
 
101
        # and after adding content, doing a clear_cache and a get should work.
 
102
        f.add_lines('0', [], ['a'])
 
103
        f.clear_cache()
 
104
        self.assertEqual(['a'], f.get_lines('0'))
 
105
 
 
106
    def test_clone_text(self):
 
107
        f = self.get_file()
 
108
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
109
        f.clone_text('r1', 'r0', ['r0'])
 
110
        def verify_file(f):
 
111
            self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
 
112
            self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
 
113
            self.assertEquals(f.get_parents('r1'), ['r0'])
 
114
    
 
115
            self.assertRaises(RevisionNotPresent,
 
116
                f.clone_text, 'r2', 'rX', [])
 
117
            self.assertRaises(RevisionAlreadyPresent,
 
118
                f.clone_text, 'r1', 'r0', [])
 
119
        verify_file(f)
 
120
        verify_file(self.reopen_file())
 
121
 
 
122
    def test_create_empty(self):
 
123
        f = self.get_file()
 
124
        f.add_lines('0', [], ['a\n'])
 
125
        new_f = f.create_empty('t', MemoryTransport())
 
126
        # smoke test, specific types should check it is honoured correctly for
 
127
        # non type attributes
 
128
        self.assertEqual([], new_f.versions())
 
129
        self.assertTrue(isinstance(new_f, f.__class__))
 
130
 
 
131
    def test_copy_to(self):
 
132
        f = self.get_file()
 
133
        f.add_lines('0', [], ['a\n'])
 
134
        t = MemoryTransport()
 
135
        f.copy_to('foo', t)
 
136
        for suffix in f.__class__.get_suffixes():
 
137
            self.assertTrue(t.has('foo' + suffix))
 
138
 
 
139
    def test_get_suffixes(self):
 
140
        f = self.get_file()
 
141
        # should be the same
 
142
        self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
 
143
        # and should be a list
 
144
        self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
 
145
 
 
146
    def test_get_graph(self):
 
147
        f = self.get_file()
 
148
        f.add_lines('v1', [], ['hello\n'])
 
149
        f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
150
        f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
 
151
        self.assertEqual({'v1': [],
 
152
                          'v2': ['v1'],
 
153
                          'v3': ['v2']},
 
154
                         f.get_graph())
 
155
 
 
156
    def test_get_parents(self):
 
157
        f = self.get_file()
 
158
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
159
        f.add_lines('r1', [], ['a\n', 'b\n'])
 
160
        f.add_lines('r2', [], ['a\n', 'b\n'])
 
161
        f.add_lines('r3', [], ['a\n', 'b\n'])
 
162
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
 
163
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
 
164
 
 
165
        self.assertRaises(RevisionNotPresent,
 
166
            f.get_parents, 'y')
 
167
 
 
168
    def test_annotate(self):
 
169
        f = self.get_file()
 
170
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
171
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
172
        origins = f.annotate('r1')
 
173
        self.assertEquals(origins[0][0], 'r1')
 
174
        self.assertEquals(origins[1][0], 'r0')
 
175
 
 
176
        self.assertRaises(RevisionNotPresent,
 
177
            f.annotate, 'foo')
 
178
 
 
179
    def test_walk(self):
 
180
        # tests that walk returns all the inclusions for the requested
 
181
        # revisions as well as the revisions changes themselves.
 
182
        f = self.get_file('1')
 
183
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
184
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
185
        f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
 
186
        f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
 
187
 
 
188
        lines = {}
 
189
        for lineno, insert, dset, text in f.walk(['rX', 'rY']):
 
190
            lines[text] = (insert, dset)
 
191
 
 
192
        self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
 
193
        self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
 
194
        self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
 
195
        self.assertTrue(lines['d\n'], ('rX', set([])))
 
196
        self.assertTrue(lines['e\n'], ('rY', set([])))
 
197
 
 
198
    def test_detection(self):
 
199
        # Test weaves detect corruption.
 
200
        #
 
201
        # Weaves contain a checksum of their texts.
 
202
        # When a text is extracted, this checksum should be
 
203
        # verified.
 
204
 
 
205
        w = self.get_file_corrupted_text()
 
206
 
 
207
        self.assertEqual('hello\n', w.get_text('v1'))
 
208
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
209
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
210
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
211
 
 
212
        w = self.get_file_corrupted_checksum()
 
213
 
 
214
        self.assertEqual('hello\n', w.get_text('v1'))
 
215
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
216
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
217
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
218
 
 
219
    def get_file_corrupted_text(self):
 
220
        """Return a versioned file with corrupt text but valid metadata."""
 
221
        raise NotImplementedError(self.get_file_corrupted_text)
 
222
 
 
223
    def reopen_file(self, name='foo'):
 
224
        """Open the versioned file from disk again."""
 
225
        raise NotImplementedError(self.reopen_file)
 
226
 
 
227
    def test_iter_lines_added_or_present_in_versions(self):
 
228
        # test that we get at least an equalset of the lines added by
 
229
        # versions in the weave 
 
230
        # the ordering here is to make a tree so that dumb searches have
 
231
        # more changes to muck up.
 
232
        vf = self.get_file()
 
233
        # add a base to get included
 
234
        vf.add_lines('base', [], ['base\n'])
 
235
        # add a ancestor to be included on one side
 
236
        vf.add_lines('lancestor', [], ['lancestor\n'])
 
237
        # add a ancestor to be included on the other side
 
238
        vf.add_lines('rancestor', ['base'], ['rancestor\n'])
 
239
        # add a child of rancestor with no eofile-nl
 
240
        vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
 
241
        # add a child of lancestor and base to join the two roots
 
242
        vf.add_lines('otherchild',
 
243
                     ['lancestor', 'base'],
 
244
                     ['base\n', 'lancestor\n', 'otherchild\n'])
 
245
        def iter_with_versions(versions):
 
246
            # now we need to see what lines are returned, and how often.
 
247
            lines = {'base\n':0,
 
248
                     'lancestor\n':0,
 
249
                     'rancestor\n':0,
 
250
                     'child\n':0,
 
251
                     'otherchild\n':0,
 
252
                     }
 
253
            # iterate over the lines
 
254
            for line in vf.iter_lines_added_or_present_in_versions(versions):
 
255
                lines[line] += 1
 
256
            return lines
 
257
        lines = iter_with_versions(['child', 'otherchild'])
 
258
        # we must see child and otherchild
 
259
        self.assertTrue(lines['child\n'] > 0)
 
260
        self.assertTrue(lines['otherchild\n'] > 0)
 
261
        # we dont care if we got more than that.
 
262
        
 
263
        # test all lines
 
264
        lines = iter_with_versions(None)
 
265
        # all lines must be seen at least once
 
266
        self.assertTrue(lines['base\n'] > 0)
 
267
        self.assertTrue(lines['lancestor\n'] > 0)
 
268
        self.assertTrue(lines['rancestor\n'] > 0)
 
269
        self.assertTrue(lines['child\n'] > 0)
 
270
        self.assertTrue(lines['otherchild\n'] > 0)
 
271
        
 
272
 
 
273
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
 
274
 
 
275
    def get_file(self, name='foo'):
 
276
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
 
277
 
 
278
    def get_file_corrupted_text(self):
 
279
        w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
280
        w.add_lines('v1', [], ['hello\n'])
 
281
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
282
        
 
283
        # We are going to invasively corrupt the text
 
284
        # Make sure the internals of weave are the same
 
285
        self.assertEqual([('{', 0)
 
286
                        , 'hello\n'
 
287
                        , ('}', None)
 
288
                        , ('{', 1)
 
289
                        , 'there\n'
 
290
                        , ('}', None)
 
291
                        ], w._weave)
 
292
        
 
293
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
294
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
295
                        ], w._sha1s)
 
296
        w.check()
 
297
        
 
298
        # Corrupted
 
299
        w._weave[4] = 'There\n'
 
300
        return w
 
301
 
 
302
    def get_file_corrupted_checksum(self):
 
303
        w = self.get_file_corrupted_text()
 
304
        # Corrected
 
305
        w._weave[4] = 'there\n'
 
306
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
307
        
 
308
        #Invalid checksum, first digit changed
 
309
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
310
        return w
 
311
 
 
312
    def reopen_file(self, name='foo'):
 
313
        return WeaveFile(name, get_transport(self.get_url('.')))
 
314
 
 
315
    def test_no_implicit_create(self):
 
316
        self.assertRaises(errors.NoSuchFile,
 
317
                          WeaveFile,
 
318
                          'foo',
 
319
                          get_transport(self.get_url('.')))
 
320
 
 
321
 
 
322
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
 
323
 
 
324
    def get_file(self, name='foo'):
 
325
        return KnitVersionedFile(name, get_transport(self.get_url('.')),
 
326
                                 delta=True, create=True)
 
327
 
 
328
    def get_file_corrupted_text(self):
 
329
        knit = self.get_file()
 
330
        knit.add_lines('v1', [], ['hello\n'])
 
331
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
332
        return knit
 
333
 
 
334
    def reopen_file(self, name='foo'):
 
335
        return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
 
336
 
 
337
    def test_detection(self):
 
338
        print "TODO for merging: create a corrupted knit."
 
339
        knit = self.get_file()
 
340
        knit.check()
 
341
 
 
342
    def test_no_implicit_create(self):
 
343
        self.assertRaises(errors.NoSuchFile,
 
344
                          KnitVersionedFile,
 
345
                          'foo',
 
346
                          get_transport(self.get_url('.')))
 
347
 
 
348
 
 
349
class InterString(versionedfile.InterVersionedFile):
 
350
    """An inter-versionedfile optimised code path for strings.
 
351
 
 
352
    This is for use during testing where we use strings as versionedfiles
 
353
    so that none of the default regsitered interversionedfile classes will
 
354
    match - which lets us test the match logic.
 
355
    """
 
356
 
 
357
    @staticmethod
 
358
    def is_compatible(source, target):
 
359
        """InterString is compatible with strings-as-versionedfiles."""
 
360
        return isinstance(source, str) and isinstance(target, str)
 
361
 
 
362
 
 
363
# TODO this and the InterRepository core logic should be consolidatable
 
364
# if we make the registry a separate class though we still need to 
 
365
# test the behaviour in the active registry to catch failure-to-handle-
 
366
# stange-objects
 
367
class TestInterVersionedFile(TestCaseWithTransport):
 
368
 
 
369
    def test_get_default_inter_versionedfile(self):
 
370
        # test that the InterVersionedFile.get(a, b) probes
 
371
        # for a class where is_compatible(a, b) returns
 
372
        # true and returns a default interversionedfile otherwise.
 
373
        # This also tests that the default registered optimised interversionedfile
 
374
        # classes do not barf inappropriately when a surprising versionedfile type
 
375
        # is handed to them.
 
376
        dummy_a = "VersionedFile 1."
 
377
        dummy_b = "VersionedFile 2."
 
378
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
379
 
 
380
    def assertGetsDefaultInterVersionedFile(self, a, b):
 
381
        """Asserts that InterVersionedFile.get(a, b) -> the default."""
 
382
        inter = versionedfile.InterVersionedFile.get(a, b)
 
383
        self.assertEqual(versionedfile.InterVersionedFile,
 
384
                         inter.__class__)
 
385
        self.assertEqual(a, inter.source)
 
386
        self.assertEqual(b, inter.target)
 
387
 
 
388
    def test_register_inter_versionedfile_class(self):
 
389
        # test that a optimised code path provider - a
 
390
        # InterVersionedFile subclass can be registered and unregistered
 
391
        # and that it is correctly selected when given a versionedfile
 
392
        # pair that it returns true on for the is_compatible static method
 
393
        # check
 
394
        dummy_a = "VersionedFile 1."
 
395
        dummy_b = "VersionedFile 2."
 
396
        versionedfile.InterVersionedFile.register_optimiser(InterString)
 
397
        try:
 
398
            # we should get the default for something InterString returns False
 
399
            # to
 
400
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
401
            self.assertGetsDefaultInterVersionedFile(dummy_a, None)
 
402
            # and we should get an InterString for a pair it 'likes'
 
403
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
404
            inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
 
405
            self.assertEqual(InterString, inter.__class__)
 
406
            self.assertEqual(dummy_a, inter.source)
 
407
            self.assertEqual(dummy_b, inter.target)
 
408
        finally:
 
409
            versionedfile.InterVersionedFile.unregister_optimiser(InterString)
 
410
        # now we should get the default InterVersionedFile object again.
 
411
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)