/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

  • Committer: Robert Collins
  • Date: 2006-03-01 02:01:47 UTC
  • mto: (1594.2.4 integration)
  • mto: This revision was merged to the branch mainline in revision 1596.
  • Revision ID: robertc@robertcollins.net-20060301020147-62bb5465f75fbf40
Start check tests for knits (pending), and remove dead code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
#
 
3
# Authors:
 
4
#   Johan Rydberg <jrydberg@gnu.org>
 
5
#
 
6
# This program is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation; either version 2 of the License, or
 
9
# (at your option) any later version.
 
10
 
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
 
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, write to the Free Software
 
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
# Remaing to do is to figure out if get_graph should return a simple
 
21
# map, or a graph object of some kind.
 
22
 
 
23
 
 
24
import bzrlib
 
25
import bzrlib.errors as errors
 
26
from bzrlib.errors import RevisionNotPresent, \
 
27
     RevisionAlreadyPresent
 
28
from bzrlib.knit import KnitVersionedFile, \
 
29
     KnitAnnotateFactory
 
30
from bzrlib.tests import TestCaseInTempDir
 
31
from bzrlib.trace import mutter
 
32
from bzrlib.transport.local import LocalTransport
 
33
from bzrlib.weave import Weave
 
34
 
 
35
 
 
36
class VersionedFileTestMixIn(object):
 
37
    """A mixin test class for testing VersionedFiles.
 
38
 
 
39
    This is not an adaptor-style test at this point because
 
40
    theres no dynamic substitution of versioned file implementations,
 
41
    they are strictly controlled by their owning repositories.
 
42
    """
 
43
 
 
44
    def test_add(self):
 
45
        f = self.get_file()
 
46
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
47
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
48
        versions = f.versions()
 
49
        self.assertTrue('r0' in versions)
 
50
        self.assertTrue('r1' in versions)
 
51
        self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
52
        self.assertEquals(f.get_text('r0'), 'a\nb\n')
 
53
        self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
54
 
 
55
        self.assertRaises(RevisionNotPresent,
 
56
            f.add_lines, 'r2', ['foo'], [])
 
57
        self.assertRaises(RevisionAlreadyPresent,
 
58
            f.add_lines, 'r1', [], [])
 
59
 
 
60
    def test_ancestry(self):
 
61
        f = self.get_file()
 
62
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
63
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
64
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
 
65
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
 
66
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
 
67
        versions = set(f.get_ancestry(['rM']))
 
68
        self.assertEquals(versions, set(['rM', 'r2', 'r1', 'r0']))
 
69
 
 
70
        self.assertRaises(RevisionNotPresent,
 
71
            f.get_ancestry, ['rM', 'rX'])
 
72
 
 
73
    def test_clone_text(self):
 
74
        f = self.get_file()
 
75
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
76
        f.clone_text('r1', 'r0', ['r0'])
 
77
        self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
 
78
        self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
 
79
        self.assertEquals(f.get_parents('r1'), ['r0'])
 
80
 
 
81
        self.assertRaises(RevisionNotPresent,
 
82
            f.clone_text, 'r2', 'rX', [])
 
83
        self.assertRaises(RevisionAlreadyPresent,
 
84
            f.clone_text, 'r1', 'r0', [])
 
85
 
 
86
    def test_get_parents(self):
 
87
        f = self.get_file()
 
88
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
89
        f.add_lines('r1', [], ['a\n', 'b\n'])
 
90
        f.add_lines('r2', [], ['a\n', 'b\n'])
 
91
        f.add_lines('r3', [], ['a\n', 'b\n'])
 
92
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
 
93
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
 
94
 
 
95
        self.assertRaises(RevisionNotPresent,
 
96
            f.get_parents, 'y')
 
97
 
 
98
    def test_annotate(self):
 
99
        f = self.get_file()
 
100
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
101
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
102
        origins = f.annotate('r1')
 
103
        self.assertEquals(origins[0][0], 'r1')
 
104
        self.assertEquals(origins[1][0], 'r0')
 
105
 
 
106
        self.assertRaises(RevisionNotPresent,
 
107
            f.annotate, 'foo')
 
108
 
 
109
    def test_join(self):
 
110
        f1 = self.get_file('1')
 
111
        f1.add_lines('r0', [], ['a\n', 'b\n'])
 
112
        f1.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
113
        f2 = self.get_file('2')
 
114
        f2.join(f1, None)
 
115
        self.assertTrue(f2.has_version('r0'))
 
116
        self.assertTrue(f2.has_version('r1'))
 
117
 
 
118
        self.assertRaises(RevisionNotPresent,
 
119
            f2.join, f1, version_ids=['r3'])
 
120
 
 
121
 
 
122
        #f3 = self.get_file('1')
 
123
        #f3.add_lines('r0', ['a\n', 'b\n'], [])
 
124
        #f3.add_lines('r1', ['c\n', 'b\n'], ['r0'])
 
125
        #f4 = self.get_file('2')
 
126
        #f4.join(f3, ['r0'])
 
127
        #self.assertTrue(f4.has_version('r0'))
 
128
        #self.assertFalse(f4.has_version('r1'))
 
129
 
 
130
    def test_walk(self):
 
131
        f = self.get_file('1')
 
132
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
133
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
134
        f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
 
135
        f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
 
136
 
 
137
        lines = {}
 
138
        for lineno, insert, dset, text in f.walk(['rX', 'rY']):
 
139
            lines[text] = (insert, dset)
 
140
 
 
141
        self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
 
142
        self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
 
143
        self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
 
144
        self.assertTrue(lines['d\n'], ('rX', set([])))
 
145
        self.assertTrue(lines['e\n'], ('rY', set([])))
 
146
 
 
147
    def test_detection(self):
 
148
        # Test weaves detect corruption.
 
149
        #
 
150
        # Weaves contain a checksum of their texts.
 
151
        # When a text is extracted, this checksum should be
 
152
        # verified.
 
153
 
 
154
        w = self.get_file_corrupted_text()
 
155
 
 
156
        self.assertEqual('hello\n', w.get_text('v1'))
 
157
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
158
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
159
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
160
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
161
 
 
162
        w = self.get_file_corrupted_checksum()
 
163
 
 
164
        self.assertEqual('hello\n', w.get_text('v1'))
 
165
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
166
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
167
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
168
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
169
 
 
170
 
 
171
    def get_file_corrupted_text(self):
 
172
        """Return a versioned file with corrupt text but valid metadata."""
 
173
        raise NotImplementedError(self.get_file_corrupted_text)
 
174
 
 
175
 
 
176
class TestWeave(TestCaseInTempDir, VersionedFileTestMixIn):
 
177
 
 
178
    def get_file(self, name='foo'):
 
179
        return Weave(name)
 
180
 
 
181
    def get_file_corrupted_text(self):
 
182
        w = Weave()
 
183
        w.add('v1', [], ['hello\n'])
 
184
        w.add('v2', ['v1'], ['hello\n', 'there\n'])
 
185
        
 
186
        # We are going to invasively corrupt the text
 
187
        # Make sure the internals of weave are the same
 
188
        self.assertEqual([('{', 0)
 
189
                        , 'hello\n'
 
190
                        , ('}', None)
 
191
                        , ('{', 1)
 
192
                        , 'there\n'
 
193
                        , ('}', None)
 
194
                        ], w._weave)
 
195
        
 
196
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
197
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
198
                        ], w._sha1s)
 
199
        w.check()
 
200
        
 
201
        # Corrupted
 
202
        w._weave[4] = 'There\n'
 
203
        return w
 
204
 
 
205
    def get_file_corrupted_checksum(self):
 
206
        w = self.get_file_corrupted_text()
 
207
        # Corrected
 
208
        w._weave[4] = 'there\n'
 
209
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
210
        
 
211
        #Invalid checksum, first digit changed
 
212
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
213
        return w
 
214
 
 
215
 
 
216
class TestKnit(TestCaseInTempDir, VersionedFileTestMixIn):
 
217
 
 
218
    def get_file(self, name='foo'):
 
219
        return KnitVersionedFile(LocalTransport('.'),
 
220
            name, 'w', KnitAnnotateFactory(), delta=True)
 
221
 
 
222
    def get_file_corrupted_text(self):
 
223
        knit = self.get_file()
 
224
        knit.add_lines('v1', [], ['hello\n'])
 
225
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
226
        return knit