/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

  • Committer: Robert Collins
  • Date: 2006-03-06 09:38:37 UTC
  • mto: (1594.2.4 integration)
  • mto: This revision was merged to the branch mainline in revision 1596.
  • Revision ID: robertc@robertcollins.net-20060306093837-b151989e9572895e
Remove all but fetch references to repository.revision_store.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
#
 
3
# Authors:
 
4
#   Johan Rydberg <jrydberg@gnu.org>
 
5
#
 
6
# This program is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation; either version 2 of the License, or
 
9
# (at your option) any later version.
 
10
 
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
 
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, write to the Free Software
 
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
 
 
21
import bzrlib
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import (
 
24
                           RevisionNotPresent, 
 
25
                           RevisionAlreadyPresent,
 
26
                           WeaveParentMismatch
 
27
                           )
 
28
from bzrlib.knit import KnitVersionedFile, \
 
29
     KnitAnnotateFactory
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.trace import mutter
 
32
from bzrlib.transport import get_transport
 
33
from bzrlib.transport.memory import MemoryTransport
 
34
import bzrlib.versionedfile as versionedfile
 
35
from bzrlib.weave import WeaveFile
 
36
from bzrlib.weavefile import read_weave
 
37
 
 
38
 
 
39
class VersionedFileTestMixIn(object):
 
40
    """A mixin test class for testing VersionedFiles.
 
41
 
 
42
    This is not an adaptor-style test at this point because
 
43
    theres no dynamic substitution of versioned file implementations,
 
44
    they are strictly controlled by their owning repositories.
 
45
    """
 
46
 
 
47
    def test_add(self):
 
48
        f = self.get_file()
 
49
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
50
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
51
        def verify_file(f):
 
52
            versions = f.versions()
 
53
            self.assertTrue('r0' in versions)
 
54
            self.assertTrue('r1' in versions)
 
55
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
56
            self.assertEquals(f.get_text('r0'), 'a\nb\n')
 
57
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
58
            self.assertEqual(2, len(f))
 
59
            self.assertEqual(2, f.num_versions())
 
60
    
 
61
            self.assertRaises(RevisionNotPresent,
 
62
                f.add_lines, 'r2', ['foo'], [])
 
63
            self.assertRaises(RevisionAlreadyPresent,
 
64
                f.add_lines, 'r1', [], [])
 
65
        verify_file(f)
 
66
        f = self.reopen_file()
 
67
        verify_file(f)
 
68
 
 
69
    def test_ancestry(self):
 
70
        f = self.get_file()
 
71
        self.assertEqual([], f.get_ancestry([]))
 
72
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
73
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
74
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
 
75
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
 
76
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
 
77
        self.assertEqual([], f.get_ancestry([]))
 
78
        versions = set(f.get_ancestry(['rM']))
 
79
        self.assertEquals(versions, set(['rM', 'r2', 'r1', 'r0']))
 
80
 
 
81
        self.assertRaises(RevisionNotPresent,
 
82
            f.get_ancestry, ['rM', 'rX'])
 
83
        
 
84
    def test_clear_cache(self):
 
85
        f = self.get_file()
 
86
        # on a new file it should not error
 
87
        f.clear_cache()
 
88
        # and after adding content, doing a clear_cache and a get should work.
 
89
        f.add_lines('0', [], ['a'])
 
90
        f.clear_cache()
 
91
        self.assertEqual(['a'], f.get_lines('0'))
 
92
 
 
93
    def test_clone_text(self):
 
94
        f = self.get_file()
 
95
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
96
        f.clone_text('r1', 'r0', ['r0'])
 
97
        def verify_file(f):
 
98
            self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
 
99
            self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
 
100
            self.assertEquals(f.get_parents('r1'), ['r0'])
 
101
    
 
102
            self.assertRaises(RevisionNotPresent,
 
103
                f.clone_text, 'r2', 'rX', [])
 
104
            self.assertRaises(RevisionAlreadyPresent,
 
105
                f.clone_text, 'r1', 'r0', [])
 
106
        verify_file(f)
 
107
        verify_file(self.reopen_file())
 
108
 
 
109
    def test_create_empty(self):
 
110
        f = self.get_file()
 
111
        f.add_lines('0', [], ['a\n'])
 
112
        new_f = f.create_empty('t', MemoryTransport())
 
113
        # smoke test, specific types should check it is honoured correctly for
 
114
        # non type attributes
 
115
        self.assertEqual([], new_f.versions())
 
116
        self.assertTrue(isinstance(new_f, f.__class__))
 
117
 
 
118
    def test_copy_to(self):
 
119
        f = self.get_file()
 
120
        f.add_lines('0', [], ['a\n'])
 
121
        t = MemoryTransport()
 
122
        f.copy_to('foo', t)
 
123
        for suffix in f.__class__.get_suffixes():
 
124
            self.assertTrue(t.has('foo' + suffix))
 
125
 
 
126
    def test_get_suffixes(self):
 
127
        f = self.get_file()
 
128
        # should be the same
 
129
        self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
 
130
        # and should be a list
 
131
        self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
 
132
 
 
133
    def test_get_graph(self):
 
134
        f = self.get_file()
 
135
        f.add_lines('v1', [], ['hello\n'])
 
136
        f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
137
        f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
 
138
        self.assertEqual({'v1': [],
 
139
                          'v2': ['v1'],
 
140
                          'v3': ['v2']},
 
141
                         f.get_graph())
 
142
 
 
143
    def test_get_parents(self):
 
144
        f = self.get_file()
 
145
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
146
        f.add_lines('r1', [], ['a\n', 'b\n'])
 
147
        f.add_lines('r2', [], ['a\n', 'b\n'])
 
148
        f.add_lines('r3', [], ['a\n', 'b\n'])
 
149
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
 
150
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
 
151
 
 
152
        self.assertRaises(RevisionNotPresent,
 
153
            f.get_parents, 'y')
 
154
 
 
155
    def test_annotate(self):
 
156
        f = self.get_file()
 
157
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
158
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
159
        origins = f.annotate('r1')
 
160
        self.assertEquals(origins[0][0], 'r1')
 
161
        self.assertEquals(origins[1][0], 'r0')
 
162
 
 
163
        self.assertRaises(RevisionNotPresent,
 
164
            f.annotate, 'foo')
 
165
 
 
166
    def test_walk(self):
 
167
        f = self.get_file('1')
 
168
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
169
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
170
        f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
 
171
        f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
 
172
 
 
173
        lines = {}
 
174
        for lineno, insert, dset, text in f.walk(['rX', 'rY']):
 
175
            lines[text] = (insert, dset)
 
176
 
 
177
        self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
 
178
        self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
 
179
        self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
 
180
        self.assertTrue(lines['d\n'], ('rX', set([])))
 
181
        self.assertTrue(lines['e\n'], ('rY', set([])))
 
182
 
 
183
    def test_detection(self):
 
184
        # Test weaves detect corruption.
 
185
        #
 
186
        # Weaves contain a checksum of their texts.
 
187
        # When a text is extracted, this checksum should be
 
188
        # verified.
 
189
 
 
190
        w = self.get_file_corrupted_text()
 
191
 
 
192
        self.assertEqual('hello\n', w.get_text('v1'))
 
193
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
194
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
195
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
196
 
 
197
        w = self.get_file_corrupted_checksum()
 
198
 
 
199
        self.assertEqual('hello\n', w.get_text('v1'))
 
200
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
201
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
202
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
203
 
 
204
    def get_file_corrupted_text(self):
 
205
        """Return a versioned file with corrupt text but valid metadata."""
 
206
        raise NotImplementedError(self.get_file_corrupted_text)
 
207
 
 
208
    def reopen_file(self, name='foo'):
 
209
        """Open the versioned file from disk again."""
 
210
        raise NotImplementedError(self.reopen_file)
 
211
 
 
212
 
 
213
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
 
214
 
 
215
    def get_file(self, name='foo'):
 
216
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
 
217
 
 
218
    def get_file_corrupted_text(self):
 
219
        w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
220
        w.add_lines('v1', [], ['hello\n'])
 
221
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
222
        
 
223
        # We are going to invasively corrupt the text
 
224
        # Make sure the internals of weave are the same
 
225
        self.assertEqual([('{', 0)
 
226
                        , 'hello\n'
 
227
                        , ('}', None)
 
228
                        , ('{', 1)
 
229
                        , 'there\n'
 
230
                        , ('}', None)
 
231
                        ], w._weave)
 
232
        
 
233
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
234
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
235
                        ], w._sha1s)
 
236
        w.check()
 
237
        
 
238
        # Corrupted
 
239
        w._weave[4] = 'There\n'
 
240
        return w
 
241
 
 
242
    def get_file_corrupted_checksum(self):
 
243
        w = self.get_file_corrupted_text()
 
244
        # Corrected
 
245
        w._weave[4] = 'there\n'
 
246
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
247
        
 
248
        #Invalid checksum, first digit changed
 
249
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
250
        return w
 
251
 
 
252
    def reopen_file(self, name='foo'):
 
253
        return WeaveFile(name, get_transport(self.get_url('.')))
 
254
 
 
255
    def test_no_implicit_create(self):
 
256
        self.assertRaises(errors.NoSuchFile,
 
257
                          WeaveFile,
 
258
                          'foo',
 
259
                          get_transport(self.get_url('.')))
 
260
 
 
261
 
 
262
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
 
263
 
 
264
    def get_file(self, name='foo'):
 
265
        return KnitVersionedFile(name, get_transport(self.get_url('.')),
 
266
                                 delta=True, create=True)
 
267
 
 
268
    def get_file_corrupted_text(self):
 
269
        knit = self.get_file()
 
270
        knit.add_lines('v1', [], ['hello\n'])
 
271
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
272
        return knit
 
273
 
 
274
    def reopen_file(self, name='foo'):
 
275
        return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
 
276
 
 
277
    def test_detection(self):
 
278
        print "TODO for merging: create a corrupted knit."
 
279
        knit = self.get_file()
 
280
        knit.check()
 
281
 
 
282
    def test_no_implicit_create(self):
 
283
        self.assertRaises(errors.NoSuchFile,
 
284
                          KnitVersionedFile,
 
285
                          'foo',
 
286
                          get_transport(self.get_url('.')))
 
287
 
 
288
 
 
289
class InterString(versionedfile.InterVersionedFile):
 
290
    """An inter-versionedfile optimised code path for strings.
 
291
 
 
292
    This is for use during testing where we use strings as versionedfiles
 
293
    so that none of the default regsitered interversionedfile classes will
 
294
    match - which lets us test the match logic.
 
295
    """
 
296
 
 
297
    @staticmethod
 
298
    def is_compatible(source, target):
 
299
        """InterString is compatible with strings-as-versionedfiles."""
 
300
        return isinstance(source, str) and isinstance(target, str)
 
301
 
 
302
 
 
303
# TODO this and the InterRepository core logic should be consolidatable
 
304
# if we make the registry a separate class though we still need to 
 
305
# test the behaviour in the active registry to catch failure-to-handle-
 
306
# stange-objects
 
307
class TestInterVersionedFile(TestCaseWithTransport):
 
308
 
 
309
    def test_get_default_inter_versionedfile(self):
 
310
        # test that the InterVersionedFile.get(a, b) probes
 
311
        # for a class where is_compatible(a, b) returns
 
312
        # true and returns a default interversionedfile otherwise.
 
313
        # This also tests that the default registered optimised interversionedfile
 
314
        # classes do not barf inappropriately when a surprising versionedfile type
 
315
        # is handed to them.
 
316
        dummy_a = "VersionedFile 1."
 
317
        dummy_b = "VersionedFile 2."
 
318
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
319
 
 
320
    def assertGetsDefaultInterVersionedFile(self, a, b):
 
321
        """Asserts that InterVersionedFile.get(a, b) -> the default."""
 
322
        inter = versionedfile.InterVersionedFile.get(a, b)
 
323
        self.assertEqual(versionedfile.InterVersionedFile,
 
324
                         inter.__class__)
 
325
        self.assertEqual(a, inter.source)
 
326
        self.assertEqual(b, inter.target)
 
327
 
 
328
    def test_register_inter_versionedfile_class(self):
 
329
        # test that a optimised code path provider - a
 
330
        # InterVersionedFile subclass can be registered and unregistered
 
331
        # and that it is correctly selected when given a versionedfile
 
332
        # pair that it returns true on for the is_compatible static method
 
333
        # check
 
334
        dummy_a = "VersionedFile 1."
 
335
        dummy_b = "VersionedFile 2."
 
336
        versionedfile.InterVersionedFile.register_optimiser(InterString)
 
337
        try:
 
338
            # we should get the default for something InterString returns False
 
339
            # to
 
340
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
341
            self.assertGetsDefaultInterVersionedFile(dummy_a, None)
 
342
            # and we should get an InterString for a pair it 'likes'
 
343
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
344
            inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
 
345
            self.assertEqual(InterString, inter.__class__)
 
346
            self.assertEqual(dummy_a, inter.source)
 
347
            self.assertEqual(dummy_b, inter.target)
 
348
        finally:
 
349
            versionedfile.InterVersionedFile.unregister_optimiser(InterString)
 
350
        # now we should get the default InterVersionedFile object again.
 
351
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
352
 
 
353