/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

  • Committer: Robert Collins
  • Date: 2006-04-18 02:50:25 UTC
  • mto: (1711.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 1667.
  • Revision ID: robertc@robertcollins.net-20060418025025-0b00ecca60b5d0db
Add trivial http-using test for versioned files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
#
 
3
# Authors:
 
4
#   Johan Rydberg <jrydberg@gnu.org>
 
5
#
 
6
# This program is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation; either version 2 of the License, or
 
9
# (at your option) any later version.
 
10
 
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
 
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, write to the Free Software
 
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
 
 
21
import bzrlib
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import (
 
24
                           RevisionNotPresent, 
 
25
                           RevisionAlreadyPresent,
 
26
                           WeaveParentMismatch
 
27
                           )
 
28
from bzrlib.knit import KnitVersionedFile, \
 
29
     KnitAnnotateFactory
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
32
from bzrlib.trace import mutter
 
33
from bzrlib.transport import get_transport
 
34
from bzrlib.transport.memory import MemoryTransport
 
35
import bzrlib.versionedfile as versionedfile
 
36
from bzrlib.weave import WeaveFile
 
37
from bzrlib.weavefile import read_weave
 
38
 
 
39
 
 
40
class VersionedFileTestMixIn(object):
 
41
    """A mixin test class for testing VersionedFiles.
 
42
 
 
43
    This is not an adaptor-style test at this point because
 
44
    theres no dynamic substitution of versioned file implementations,
 
45
    they are strictly controlled by their owning repositories.
 
46
    """
 
47
 
 
48
    def test_add(self):
 
49
        f = self.get_file()
 
50
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
51
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
52
        def verify_file(f):
 
53
            versions = f.versions()
 
54
            self.assertTrue('r0' in versions)
 
55
            self.assertTrue('r1' in versions)
 
56
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
57
            self.assertEquals(f.get_text('r0'), 'a\nb\n')
 
58
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
59
            self.assertEqual(2, len(f))
 
60
            self.assertEqual(2, f.num_versions())
 
61
    
 
62
            self.assertRaises(RevisionNotPresent,
 
63
                f.add_lines, 'r2', ['foo'], [])
 
64
            self.assertRaises(RevisionAlreadyPresent,
 
65
                f.add_lines, 'r1', [], [])
 
66
        verify_file(f)
 
67
        f = self.reopen_file()
 
68
        verify_file(f)
 
69
 
 
70
    def test_adds_with_parent_texts(self):
 
71
        f = self.get_file()
 
72
        parent_texts = {}
 
73
        parent_texts['r0'] = f.add_lines('r0', [], ['a\n', 'b\n'])
 
74
        try:
 
75
            parent_texts['r1'] = f.add_lines_with_ghosts('r1',
 
76
                                                         ['r0', 'ghost'], 
 
77
                                                         ['b\n', 'c\n'],
 
78
                                                         parent_texts=parent_texts)
 
79
        except NotImplementedError:
 
80
            # if the format doesn't support ghosts, just add normally.
 
81
            parent_texts['r1'] = f.add_lines('r1',
 
82
                                             ['r0'], 
 
83
                                             ['b\n', 'c\n'],
 
84
                                             parent_texts=parent_texts)
 
85
        f.add_lines('r2', ['r1'], ['c\n', 'd\n'], parent_texts=parent_texts)
 
86
        self.assertNotEqual(None, parent_texts['r0'])
 
87
        self.assertNotEqual(None, parent_texts['r1'])
 
88
        def verify_file(f):
 
89
            versions = f.versions()
 
90
            self.assertTrue('r0' in versions)
 
91
            self.assertTrue('r1' in versions)
 
92
            self.assertTrue('r2' in versions)
 
93
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
94
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
95
            self.assertEquals(f.get_lines('r2'), ['c\n', 'd\n'])
 
96
            self.assertEqual(3, f.num_versions())
 
97
            origins = f.annotate('r1')
 
98
            self.assertEquals(origins[0][0], 'r0')
 
99
            self.assertEquals(origins[1][0], 'r1')
 
100
            origins = f.annotate('r2')
 
101
            self.assertEquals(origins[0][0], 'r1')
 
102
            self.assertEquals(origins[1][0], 'r2')
 
103
 
 
104
        verify_file(f)
 
105
        f = self.reopen_file()
 
106
        verify_file(f)
 
107
 
 
108
    def test_get_delta(self):
 
109
        f = self.get_file()
 
110
        sha1s = self._setup_for_deltas(f)
 
111
        expected_delta = (None, '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False, 
 
112
                          [(0, 0, 1, [('base', 'line\n')])])
 
113
        self.assertEqual(expected_delta, f.get_delta('base'))
 
114
        next_parent = 'base'
 
115
        text_name = 'chain1-'
 
116
        for depth in range(26):
 
117
            new_version = text_name + '%s' % depth
 
118
            expected_delta = (next_parent, sha1s[depth], 
 
119
                              False,
 
120
                              [(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
 
121
            self.assertEqual(expected_delta, f.get_delta(new_version))
 
122
            next_parent = new_version
 
123
        next_parent = 'base'
 
124
        text_name = 'chain2-'
 
125
        for depth in range(26):
 
126
            new_version = text_name + '%s' % depth
 
127
            expected_delta = (next_parent, sha1s[depth], False,
 
128
                              [(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
 
129
            self.assertEqual(expected_delta, f.get_delta(new_version))
 
130
            next_parent = new_version
 
131
        # smoke test for eol support
 
132
        expected_delta = ('base', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True, [])
 
133
        self.assertEqual(['line'], f.get_lines('noeol'))
 
134
        self.assertEqual(expected_delta, f.get_delta('noeol'))
 
135
 
 
136
    def test_get_deltas(self):
 
137
        f = self.get_file()
 
138
        sha1s = self._setup_for_deltas(f)
 
139
        deltas = f.get_deltas(f.versions())
 
140
        expected_delta = (None, '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False, 
 
141
                          [(0, 0, 1, [('base', 'line\n')])])
 
142
        self.assertEqual(expected_delta, deltas['base'])
 
143
        next_parent = 'base'
 
144
        text_name = 'chain1-'
 
145
        for depth in range(26):
 
146
            new_version = text_name + '%s' % depth
 
147
            expected_delta = (next_parent, sha1s[depth], 
 
148
                              False,
 
149
                              [(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
 
150
            self.assertEqual(expected_delta, deltas[new_version])
 
151
            next_parent = new_version
 
152
        next_parent = 'base'
 
153
        text_name = 'chain2-'
 
154
        for depth in range(26):
 
155
            new_version = text_name + '%s' % depth
 
156
            expected_delta = (next_parent, sha1s[depth], False,
 
157
                              [(depth + 1, depth + 1, 1, [(new_version, 'line\n')])])
 
158
            self.assertEqual(expected_delta, deltas[new_version])
 
159
            next_parent = new_version
 
160
        # smoke tests for eol support
 
161
        expected_delta = ('base', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True, [])
 
162
        self.assertEqual(['line'], f.get_lines('noeol'))
 
163
        self.assertEqual(expected_delta, deltas['noeol'])
 
164
        # smoke tests for eol support - two noeol in a row same content
 
165
        expected_deltas = (('noeol', '3ad7ee82dbd8f29ecba073f96e43e414b3f70a4d', True, 
 
166
                          [(0, 1, 2, [(u'noeolsecond', 'line\n'), (u'noeolsecond', 'line\n')])]),
 
167
                          ('noeol', '3ad7ee82dbd8f29ecba073f96e43e414b3f70a4d', True, 
 
168
                           [(0, 0, 1, [('noeolsecond', 'line\n')]), (1, 1, 0, [])]))
 
169
        self.assertEqual(['line\n', 'line'], f.get_lines('noeolsecond'))
 
170
        self.assertTrue(deltas['noeolsecond'] in expected_deltas)
 
171
        # two no-eol in a row, different content
 
172
        expected_delta = ('noeolsecond', '8bb553a84e019ef1149db082d65f3133b195223b', True, 
 
173
                          [(1, 2, 1, [(u'noeolnotshared', 'phone\n')])])
 
174
        self.assertEqual(['line\n', 'phone'], f.get_lines('noeolnotshared'))
 
175
        self.assertEqual(expected_delta, deltas['noeolnotshared'])
 
176
        # eol folling a no-eol with content change
 
177
        expected_delta = ('noeol', 'a61f6fb6cfc4596e8d88c34a308d1e724caf8977', False, 
 
178
                          [(0, 1, 1, [(u'eol', 'phone\n')])])
 
179
        self.assertEqual(['phone\n'], f.get_lines('eol'))
 
180
        self.assertEqual(expected_delta, deltas['eol'])
 
181
        # eol folling a no-eol with content change
 
182
        expected_delta = ('noeol', '6bfa09d82ce3e898ad4641ae13dd4fdb9cf0d76b', False, 
 
183
                          [(0, 1, 1, [(u'eolline', 'line\n')])])
 
184
        self.assertEqual(['line\n'], f.get_lines('eolline'))
 
185
        self.assertEqual(expected_delta, deltas['eolline'])
 
186
        # eol with no parents
 
187
        expected_delta = (None, '264f39cab871e4cfd65b3a002f7255888bb5ed97', True, 
 
188
                          [(0, 0, 1, [(u'noeolbase', 'line\n')])])
 
189
        self.assertEqual(['line'], f.get_lines('noeolbase'))
 
190
        self.assertEqual(expected_delta, deltas['noeolbase'])
 
191
        # eol with two parents, in inverse insertion order
 
192
        expected_deltas = (('noeolbase', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True,
 
193
                            [(0, 1, 1, [(u'eolbeforefirstparent', 'line\n')])]),
 
194
                           ('noeolbase', '264f39cab871e4cfd65b3a002f7255888bb5ed97', True,
 
195
                            [(0, 1, 1, [(u'eolbeforefirstparent', 'line\n')])]))
 
196
        self.assertEqual(['line'], f.get_lines('eolbeforefirstparent'))
 
197
        #self.assertTrue(deltas['eolbeforefirstparent'] in expected_deltas)
 
198
 
 
199
    def _setup_for_deltas(self, f):
 
200
        self.assertRaises(errors.RevisionNotPresent, f.get_delta, 'base')
 
201
        # add texts that should trip the knit maximum delta chain threshold
 
202
        # as well as doing parallel chains of data in knits.
 
203
        # this is done by two chains of 25 insertions
 
204
        f.add_lines('base', [], ['line\n'])
 
205
        f.add_lines('noeol', ['base'], ['line'])
 
206
        # detailed eol tests:
 
207
        # shared last line with parent no-eol
 
208
        f.add_lines('noeolsecond', ['noeol'], ['line\n', 'line'])
 
209
        # differing last line with parent, both no-eol
 
210
        f.add_lines('noeolnotshared', ['noeolsecond'], ['line\n', 'phone'])
 
211
        # add eol following a noneol parent, change content
 
212
        f.add_lines('eol', ['noeol'], ['phone\n'])
 
213
        # add eol following a noneol parent, no change content
 
214
        f.add_lines('eolline', ['noeol'], ['line\n'])
 
215
        # noeol with no parents:
 
216
        f.add_lines('noeolbase', [], ['line'])
 
217
        # noeol preceeding its leftmost parent in the output:
 
218
        # this is done by making it a merge of two parents with no common
 
219
        # anestry: noeolbase and noeol with the 
 
220
        # later-inserted parent the leftmost.
 
221
        f.add_lines('eolbeforefirstparent', ['noeolbase', 'noeol'], ['line'])
 
222
        # two identical eol texts
 
223
        f.add_lines('noeoldup', ['noeol'], ['line'])
 
224
        next_parent = 'base'
 
225
        text_name = 'chain1-'
 
226
        text = ['line\n']
 
227
        sha1s = {0 :'da6d3141cb4a5e6f464bf6e0518042ddc7bfd079',
 
228
                 1 :'45e21ea146a81ea44a821737acdb4f9791c8abe7',
 
229
                 2 :'e1f11570edf3e2a070052366c582837a4fe4e9fa',
 
230
                 3 :'26b4b8626da827088c514b8f9bbe4ebf181edda1',
 
231
                 4 :'e28a5510be25ba84d31121cff00956f9970ae6f6',
 
232
                 5 :'d63ec0ce22e11dcf65a931b69255d3ac747a318d',
 
233
                 6 :'2c2888d288cb5e1d98009d822fedfe6019c6a4ea',
 
234
                 7 :'95c14da9cafbf828e3e74a6f016d87926ba234ab',
 
235
                 8 :'779e9a0b28f9f832528d4b21e17e168c67697272',
 
236
                 9 :'1f8ff4e5c6ff78ac106fcfe6b1e8cb8740ff9a8f',
 
237
                 10:'131a2ae712cf51ed62f143e3fbac3d4206c25a05',
 
238
                 11:'c5a9d6f520d2515e1ec401a8f8a67e6c3c89f199',
 
239
                 12:'31a2286267f24d8bedaa43355f8ad7129509ea85',
 
240
                 13:'dc2a7fe80e8ec5cae920973973a8ee28b2da5e0a',
 
241
                 14:'2c4b1736566b8ca6051e668de68650686a3922f2',
 
242
                 15:'5912e4ecd9b0c07be4d013e7e2bdcf9323276cde',
 
243
                 16:'b0d2e18d3559a00580f6b49804c23fea500feab3',
 
244
                 17:'8e1d43ad72f7562d7cb8f57ee584e20eb1a69fc7',
 
245
                 18:'5cf64a3459ae28efa60239e44b20312d25b253f3',
 
246
                 19:'1ebed371807ba5935958ad0884595126e8c4e823',
 
247
                 20:'2aa62a8b06fb3b3b892a3292a068ade69d5ee0d3',
 
248
                 21:'01edc447978004f6e4e962b417a4ae1955b6fe5d',
 
249
                 22:'d8d8dc49c4bf0bab401e0298bb5ad827768618bb',
 
250
                 23:'c21f62b1c482862983a8ffb2b0c64b3451876e3f',
 
251
                 24:'c0593fe795e00dff6b3c0fe857a074364d5f04fc',
 
252
                 25:'dd1a1cf2ba9cc225c3aff729953e6364bf1d1855',
 
253
                 }
 
254
        for depth in range(26):
 
255
            new_version = text_name + '%s' % depth
 
256
            text = text + ['line\n']
 
257
            f.add_lines(new_version, [next_parent], text)
 
258
            next_parent = new_version
 
259
        next_parent = 'base'
 
260
        text_name = 'chain2-'
 
261
        text = ['line\n']
 
262
        for depth in range(26):
 
263
            new_version = text_name + '%s' % depth
 
264
            text = text + ['line\n']
 
265
            f.add_lines(new_version, [next_parent], text)
 
266
            next_parent = new_version
 
267
        return sha1s
 
268
 
 
269
    def test_add_delta(self):
 
270
        # tests for the add-delta facility.
 
271
        # at this point, optimising for speed, we assume no checks when deltas are inserted.
 
272
        # this may need to be revisited.
 
273
        source = self.get_file('source')
 
274
        source.add_lines('base', [], ['line\n'])
 
275
        next_parent = 'base'
 
276
        text_name = 'chain1-'
 
277
        text = ['line\n']
 
278
        for depth in range(26):
 
279
            new_version = text_name + '%s' % depth
 
280
            text = text + ['line\n']
 
281
            source.add_lines(new_version, [next_parent], text)
 
282
            next_parent = new_version
 
283
        next_parent = 'base'
 
284
        text_name = 'chain2-'
 
285
        text = ['line\n']
 
286
        for depth in range(26):
 
287
            new_version = text_name + '%s' % depth
 
288
            text = text + ['line\n']
 
289
            source.add_lines(new_version, [next_parent], text)
 
290
            next_parent = new_version
 
291
        source.add_lines('noeol', ['base'], ['line'])
 
292
        
 
293
        target = self.get_file('target')
 
294
        for version in source.versions():
 
295
            parent, sha1, noeol, delta = source.get_delta(version)
 
296
            target.add_delta(version,
 
297
                             source.get_parents(version),
 
298
                             parent,
 
299
                             sha1,
 
300
                             noeol,
 
301
                             delta)
 
302
        self.assertRaises(RevisionAlreadyPresent,
 
303
                          target.add_delta, 'base', [], None, '', False, [])
 
304
        for version in source.versions():
 
305
            self.assertEqual(source.get_lines(version),
 
306
                             target.get_lines(version))
 
307
 
 
308
    def test_ancestry(self):
 
309
        f = self.get_file()
 
310
        self.assertEqual([], f.get_ancestry([]))
 
311
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
312
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
313
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
 
314
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
 
315
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
 
316
        self.assertEqual([], f.get_ancestry([]))
 
317
        versions = f.get_ancestry(['rM'])
 
318
        # there are some possibilities:
 
319
        # r0 r1 r2 rM r3
 
320
        # r0 r1 r2 r3 rM
 
321
        # etc
 
322
        # so we check indexes
 
323
        r0 = versions.index('r0')
 
324
        r1 = versions.index('r1')
 
325
        r2 = versions.index('r2')
 
326
        self.assertFalse('r3' in versions)
 
327
        rM = versions.index('rM')
 
328
        self.assertTrue(r0 < r1)
 
329
        self.assertTrue(r0 < r2)
 
330
        self.assertTrue(r1 < rM)
 
331
        self.assertTrue(r2 < rM)
 
332
 
 
333
        self.assertRaises(RevisionNotPresent,
 
334
            f.get_ancestry, ['rM', 'rX'])
 
335
 
 
336
    def test_mutate_after_finish(self):
 
337
        f = self.get_file()
 
338
        f.transaction_finished()
 
339
        self.assertRaises(errors.OutSideTransaction, f.add_delta, '', [], '', '', False, [])
 
340
        self.assertRaises(errors.OutSideTransaction, f.add_lines, '', [], [])
 
341
        self.assertRaises(errors.OutSideTransaction, f.add_lines_with_ghosts, '', [], [])
 
342
        self.assertRaises(errors.OutSideTransaction, f.fix_parents, '', [])
 
343
        self.assertRaises(errors.OutSideTransaction, f.join, '')
 
344
        self.assertRaises(errors.OutSideTransaction, f.clone_text, 'base', 'bar', ['foo'])
 
345
        
 
346
    def test_clear_cache(self):
 
347
        f = self.get_file()
 
348
        # on a new file it should not error
 
349
        f.clear_cache()
 
350
        # and after adding content, doing a clear_cache and a get should work.
 
351
        f.add_lines('0', [], ['a'])
 
352
        f.clear_cache()
 
353
        self.assertEqual(['a'], f.get_lines('0'))
 
354
 
 
355
    def test_clone_text(self):
 
356
        f = self.get_file()
 
357
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
358
        f.clone_text('r1', 'r0', ['r0'])
 
359
        def verify_file(f):
 
360
            self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
 
361
            self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
 
362
            self.assertEquals(f.get_parents('r1'), ['r0'])
 
363
    
 
364
            self.assertRaises(RevisionNotPresent,
 
365
                f.clone_text, 'r2', 'rX', [])
 
366
            self.assertRaises(RevisionAlreadyPresent,
 
367
                f.clone_text, 'r1', 'r0', [])
 
368
        verify_file(f)
 
369
        verify_file(self.reopen_file())
 
370
 
 
371
    def test_create_empty(self):
 
372
        f = self.get_file()
 
373
        f.add_lines('0', [], ['a\n'])
 
374
        new_f = f.create_empty('t', MemoryTransport())
 
375
        # smoke test, specific types should check it is honoured correctly for
 
376
        # non type attributes
 
377
        self.assertEqual([], new_f.versions())
 
378
        self.assertTrue(isinstance(new_f, f.__class__))
 
379
 
 
380
    def test_copy_to(self):
 
381
        f = self.get_file()
 
382
        f.add_lines('0', [], ['a\n'])
 
383
        t = MemoryTransport()
 
384
        f.copy_to('foo', t)
 
385
        for suffix in f.__class__.get_suffixes():
 
386
            self.assertTrue(t.has('foo' + suffix))
 
387
 
 
388
    def test_get_suffixes(self):
 
389
        f = self.get_file()
 
390
        # should be the same
 
391
        self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
 
392
        # and should be a list
 
393
        self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
 
394
 
 
395
    def test_get_graph(self):
 
396
        f = self.get_file()
 
397
        f.add_lines('v1', [], ['hello\n'])
 
398
        f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
399
        f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
 
400
        self.assertEqual({'v1': [],
 
401
                          'v2': ['v1'],
 
402
                          'v3': ['v2']},
 
403
                         f.get_graph())
 
404
 
 
405
    def test_get_parents(self):
 
406
        f = self.get_file()
 
407
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
408
        f.add_lines('r1', [], ['a\n', 'b\n'])
 
409
        f.add_lines('r2', [], ['a\n', 'b\n'])
 
410
        f.add_lines('r3', [], ['a\n', 'b\n'])
 
411
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
 
412
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
 
413
 
 
414
        self.assertRaises(RevisionNotPresent,
 
415
            f.get_parents, 'y')
 
416
 
 
417
    def test_annotate(self):
 
418
        f = self.get_file()
 
419
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
420
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
421
        origins = f.annotate('r1')
 
422
        self.assertEquals(origins[0][0], 'r1')
 
423
        self.assertEquals(origins[1][0], 'r0')
 
424
 
 
425
        self.assertRaises(RevisionNotPresent,
 
426
            f.annotate, 'foo')
 
427
 
 
428
    def test_walk(self):
 
429
        # tests that walk returns all the inclusions for the requested
 
430
        # revisions as well as the revisions changes themselves.
 
431
        f = self.get_file('1')
 
432
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
433
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
434
        f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
 
435
        f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
 
436
 
 
437
        lines = {}
 
438
        for lineno, insert, dset, text in f.walk(['rX', 'rY']):
 
439
            lines[text] = (insert, dset)
 
440
 
 
441
        self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
 
442
        self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
 
443
        self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
 
444
        self.assertTrue(lines['d\n'], ('rX', set([])))
 
445
        self.assertTrue(lines['e\n'], ('rY', set([])))
 
446
 
 
447
    def test_detection(self):
 
448
        # Test weaves detect corruption.
 
449
        #
 
450
        # Weaves contain a checksum of their texts.
 
451
        # When a text is extracted, this checksum should be
 
452
        # verified.
 
453
 
 
454
        w = self.get_file_corrupted_text()
 
455
 
 
456
        self.assertEqual('hello\n', w.get_text('v1'))
 
457
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
458
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
459
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
460
 
 
461
        w = self.get_file_corrupted_checksum()
 
462
 
 
463
        self.assertEqual('hello\n', w.get_text('v1'))
 
464
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
465
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
466
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
467
 
 
468
    def get_file_corrupted_text(self):
 
469
        """Return a versioned file with corrupt text but valid metadata."""
 
470
        raise NotImplementedError(self.get_file_corrupted_text)
 
471
 
 
472
    def reopen_file(self, name='foo'):
 
473
        """Open the versioned file from disk again."""
 
474
        raise NotImplementedError(self.reopen_file)
 
475
 
 
476
    def test_iter_lines_added_or_present_in_versions(self):
 
477
        # test that we get at least an equalset of the lines added by
 
478
        # versions in the weave 
 
479
        # the ordering here is to make a tree so that dumb searches have
 
480
        # more changes to muck up.
 
481
        vf = self.get_file()
 
482
        # add a base to get included
 
483
        vf.add_lines('base', [], ['base\n'])
 
484
        # add a ancestor to be included on one side
 
485
        vf.add_lines('lancestor', [], ['lancestor\n'])
 
486
        # add a ancestor to be included on the other side
 
487
        vf.add_lines('rancestor', ['base'], ['rancestor\n'])
 
488
        # add a child of rancestor with no eofile-nl
 
489
        vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
 
490
        # add a child of lancestor and base to join the two roots
 
491
        vf.add_lines('otherchild',
 
492
                     ['lancestor', 'base'],
 
493
                     ['base\n', 'lancestor\n', 'otherchild\n'])
 
494
        def iter_with_versions(versions):
 
495
            # now we need to see what lines are returned, and how often.
 
496
            lines = {'base\n':0,
 
497
                     'lancestor\n':0,
 
498
                     'rancestor\n':0,
 
499
                     'child\n':0,
 
500
                     'otherchild\n':0,
 
501
                     }
 
502
            # iterate over the lines
 
503
            for line in vf.iter_lines_added_or_present_in_versions(versions):
 
504
                lines[line] += 1
 
505
            return lines
 
506
        lines = iter_with_versions(['child', 'otherchild'])
 
507
        # we must see child and otherchild
 
508
        self.assertTrue(lines['child\n'] > 0)
 
509
        self.assertTrue(lines['otherchild\n'] > 0)
 
510
        # we dont care if we got more than that.
 
511
        
 
512
        # test all lines
 
513
        lines = iter_with_versions(None)
 
514
        # all lines must be seen at least once
 
515
        self.assertTrue(lines['base\n'] > 0)
 
516
        self.assertTrue(lines['lancestor\n'] > 0)
 
517
        self.assertTrue(lines['rancestor\n'] > 0)
 
518
        self.assertTrue(lines['child\n'] > 0)
 
519
        self.assertTrue(lines['otherchild\n'] > 0)
 
520
 
 
521
    def test_fix_parents(self):
 
522
        # some versioned files allow incorrect parents to be corrected after
 
523
        # insertion - this may not fix ancestry..
 
524
        # if they do not supported, they just do not implement it.
 
525
        # we test this as an interface test to ensure that those that *do*
 
526
        # implementent it get it right.
 
527
        vf = self.get_file()
 
528
        vf.add_lines('notbase', [], [])
 
529
        vf.add_lines('base', [], [])
 
530
        try:
 
531
            vf.fix_parents('notbase', ['base'])
 
532
        except NotImplementedError:
 
533
            return
 
534
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
535
        # open again, check it stuck.
 
536
        vf = self.get_file()
 
537
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
538
 
 
539
    def test_fix_parents_with_ghosts(self):
 
540
        # when fixing parents, ghosts that are listed should not be ghosts
 
541
        # anymore.
 
542
        vf = self.get_file()
 
543
 
 
544
        try:
 
545
            vf.add_lines_with_ghosts('notbase', ['base', 'stillghost'], [])
 
546
        except NotImplementedError:
 
547
            return
 
548
        vf.add_lines('base', [], [])
 
549
        vf.fix_parents('notbase', ['base', 'stillghost'])
 
550
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
551
        # open again, check it stuck.
 
552
        vf = self.get_file()
 
553
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
554
        # and check the ghosts
 
555
        self.assertEqual(['base', 'stillghost'],
 
556
                         vf.get_parents_with_ghosts('notbase'))
 
557
 
 
558
    def test_add_lines_with_ghosts(self):
 
559
        # some versioned file formats allow lines to be added with parent
 
560
        # information that is > than that in the format. Formats that do
 
561
        # not support this need to raise NotImplementedError on the
 
562
        # add_lines_with_ghosts api.
 
563
        vf = self.get_file()
 
564
        # add a revision with ghost parents
 
565
        try:
 
566
            vf.add_lines_with_ghosts(u'notbxbfse', [u'b\xbfse'], [])
 
567
        except NotImplementedError:
 
568
            # check the other ghost apis are also not implemented
 
569
            self.assertRaises(NotImplementedError, vf.has_ghost, 'foo')
 
570
            self.assertRaises(NotImplementedError, vf.get_ancestry_with_ghosts, ['foo'])
 
571
            self.assertRaises(NotImplementedError, vf.get_parents_with_ghosts, 'foo')
 
572
            self.assertRaises(NotImplementedError, vf.get_graph_with_ghosts)
 
573
            return
 
574
        # test key graph related apis: getncestry, _graph, get_parents
 
575
        # has_version
 
576
        # - these are ghost unaware and must not be reflect ghosts
 
577
        self.assertEqual([u'notbxbfse'], vf.get_ancestry(u'notbxbfse'))
 
578
        self.assertEqual([], vf.get_parents(u'notbxbfse'))
 
579
        self.assertEqual({u'notbxbfse':[]}, vf.get_graph())
 
580
        self.assertFalse(vf.has_version(u'b\xbfse'))
 
581
        # we have _with_ghost apis to give us ghost information.
 
582
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
 
583
        self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
 
584
        self.assertEqual({u'notbxbfse':[u'b\xbfse']}, vf.get_graph_with_ghosts())
 
585
        self.assertTrue(vf.has_ghost(u'b\xbfse'))
 
586
        # if we add something that is a ghost of another, it should correct the
 
587
        # results of the prior apis
 
588
        vf.add_lines(u'b\xbfse', [], [])
 
589
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry([u'notbxbfse']))
 
590
        self.assertEqual([u'b\xbfse'], vf.get_parents(u'notbxbfse'))
 
591
        self.assertEqual({u'b\xbfse':[],
 
592
                          u'notbxbfse':[u'b\xbfse'],
 
593
                          },
 
594
                         vf.get_graph())
 
595
        self.assertTrue(vf.has_version(u'b\xbfse'))
 
596
        # we have _with_ghost apis to give us ghost information.
 
597
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
 
598
        self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
 
599
        self.assertEqual({u'b\xbfse':[],
 
600
                          u'notbxbfse':[u'b\xbfse'],
 
601
                          },
 
602
                         vf.get_graph_with_ghosts())
 
603
        self.assertFalse(vf.has_ghost(u'b\xbfse'))
 
604
 
 
605
    def test_add_lines_with_ghosts_after_normal_revs(self):
 
606
        # some versioned file formats allow lines to be added with parent
 
607
        # information that is > than that in the format. Formats that do
 
608
        # not support this need to raise NotImplementedError on the
 
609
        # add_lines_with_ghosts api.
 
610
        vf = self.get_file()
 
611
        # probe for ghost support
 
612
        try:
 
613
            vf.has_ghost('hoo')
 
614
        except NotImplementedError:
 
615
            return
 
616
        vf.add_lines_with_ghosts('base', [], ['line\n', 'line_b\n'])
 
617
        vf.add_lines_with_ghosts('references_ghost',
 
618
                                 ['base', 'a_ghost'],
 
619
                                 ['line\n', 'line_b\n', 'line_c\n'])
 
620
        origins = vf.annotate('references_ghost')
 
621
        self.assertEquals(('base', 'line\n'), origins[0])
 
622
        self.assertEquals(('base', 'line_b\n'), origins[1])
 
623
        self.assertEquals(('references_ghost', 'line_c\n'), origins[2])
 
624
 
 
625
    def test_readonly_mode(self):
 
626
        transport = get_transport(self.get_url('.'))
 
627
        factory = self.get_factory()
 
628
        vf = factory('id', transport, 0777, create=True, access_mode='w')
 
629
        vf = factory('id', transport, access_mode='r')
 
630
        self.assertRaises(errors.ReadOnlyError, vf.add_delta, '', [], '', '', False, [])
 
631
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'base', [], [])
 
632
        self.assertRaises(errors.ReadOnlyError,
 
633
                          vf.add_lines_with_ghosts,
 
634
                          'base',
 
635
                          [],
 
636
                          [])
 
637
        self.assertRaises(errors.ReadOnlyError, vf.fix_parents, 'base', [])
 
638
        self.assertRaises(errors.ReadOnlyError, vf.join, 'base')
 
639
        self.assertRaises(errors.ReadOnlyError, vf.clone_text, 'base', 'bar', ['foo'])
 
640
        
 
641
 
 
642
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
 
643
 
 
644
    def get_file(self, name='foo'):
 
645
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
 
646
 
 
647
    def get_file_corrupted_text(self):
 
648
        w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
649
        w.add_lines('v1', [], ['hello\n'])
 
650
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
651
        
 
652
        # We are going to invasively corrupt the text
 
653
        # Make sure the internals of weave are the same
 
654
        self.assertEqual([('{', 0)
 
655
                        , 'hello\n'
 
656
                        , ('}', None)
 
657
                        , ('{', 1)
 
658
                        , 'there\n'
 
659
                        , ('}', None)
 
660
                        ], w._weave)
 
661
        
 
662
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
663
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
664
                        ], w._sha1s)
 
665
        w.check()
 
666
        
 
667
        # Corrupted
 
668
        w._weave[4] = 'There\n'
 
669
        return w
 
670
 
 
671
    def get_file_corrupted_checksum(self):
 
672
        w = self.get_file_corrupted_text()
 
673
        # Corrected
 
674
        w._weave[4] = 'there\n'
 
675
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
676
        
 
677
        #Invalid checksum, first digit changed
 
678
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
679
        return w
 
680
 
 
681
    def reopen_file(self, name='foo'):
 
682
        return WeaveFile(name, get_transport(self.get_url('.')))
 
683
 
 
684
    def test_no_implicit_create(self):
 
685
        self.assertRaises(errors.NoSuchFile,
 
686
                          WeaveFile,
 
687
                          'foo',
 
688
                          get_transport(self.get_url('.')))
 
689
 
 
690
    def get_factory(self):
 
691
        return WeaveFile
 
692
 
 
693
 
 
694
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
 
695
 
 
696
    def get_file(self, name='foo'):
 
697
        return KnitVersionedFile(name, get_transport(self.get_url('.')),
 
698
                                 delta=True, create=True)
 
699
 
 
700
    def get_factory(self):
 
701
        return KnitVersionedFile
 
702
 
 
703
    def get_file_corrupted_text(self):
 
704
        knit = self.get_file()
 
705
        knit.add_lines('v1', [], ['hello\n'])
 
706
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
707
        return knit
 
708
 
 
709
    def reopen_file(self, name='foo'):
 
710
        return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
 
711
 
 
712
    def test_detection(self):
 
713
        print "TODO for merging: create a corrupted knit."
 
714
        knit = self.get_file()
 
715
        knit.check()
 
716
 
 
717
    def test_no_implicit_create(self):
 
718
        self.assertRaises(errors.NoSuchFile,
 
719
                          KnitVersionedFile,
 
720
                          'foo',
 
721
                          get_transport(self.get_url('.')))
 
722
 
 
723
 
 
724
class InterString(versionedfile.InterVersionedFile):
 
725
    """An inter-versionedfile optimised code path for strings.
 
726
 
 
727
    This is for use during testing where we use strings as versionedfiles
 
728
    so that none of the default regsitered interversionedfile classes will
 
729
    match - which lets us test the match logic.
 
730
    """
 
731
 
 
732
    @staticmethod
 
733
    def is_compatible(source, target):
 
734
        """InterString is compatible with strings-as-versionedfiles."""
 
735
        return isinstance(source, str) and isinstance(target, str)
 
736
 
 
737
 
 
738
# TODO this and the InterRepository core logic should be consolidatable
 
739
# if we make the registry a separate class though we still need to 
 
740
# test the behaviour in the active registry to catch failure-to-handle-
 
741
# stange-objects
 
742
class TestInterVersionedFile(TestCaseWithTransport):
 
743
 
 
744
    def test_get_default_inter_versionedfile(self):
 
745
        # test that the InterVersionedFile.get(a, b) probes
 
746
        # for a class where is_compatible(a, b) returns
 
747
        # true and returns a default interversionedfile otherwise.
 
748
        # This also tests that the default registered optimised interversionedfile
 
749
        # classes do not barf inappropriately when a surprising versionedfile type
 
750
        # is handed to them.
 
751
        dummy_a = "VersionedFile 1."
 
752
        dummy_b = "VersionedFile 2."
 
753
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
754
 
 
755
    def assertGetsDefaultInterVersionedFile(self, a, b):
 
756
        """Asserts that InterVersionedFile.get(a, b) -> the default."""
 
757
        inter = versionedfile.InterVersionedFile.get(a, b)
 
758
        self.assertEqual(versionedfile.InterVersionedFile,
 
759
                         inter.__class__)
 
760
        self.assertEqual(a, inter.source)
 
761
        self.assertEqual(b, inter.target)
 
762
 
 
763
    def test_register_inter_versionedfile_class(self):
 
764
        # test that a optimised code path provider - a
 
765
        # InterVersionedFile subclass can be registered and unregistered
 
766
        # and that it is correctly selected when given a versionedfile
 
767
        # pair that it returns true on for the is_compatible static method
 
768
        # check
 
769
        dummy_a = "VersionedFile 1."
 
770
        dummy_b = "VersionedFile 2."
 
771
        versionedfile.InterVersionedFile.register_optimiser(InterString)
 
772
        try:
 
773
            # we should get the default for something InterString returns False
 
774
            # to
 
775
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
776
            self.assertGetsDefaultInterVersionedFile(dummy_a, None)
 
777
            # and we should get an InterString for a pair it 'likes'
 
778
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
779
            inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
 
780
            self.assertEqual(InterString, inter.__class__)
 
781
            self.assertEqual(dummy_a, inter.source)
 
782
            self.assertEqual(dummy_b, inter.target)
 
783
        finally:
 
784
            versionedfile.InterVersionedFile.unregister_optimiser(InterString)
 
785
        # now we should get the default InterVersionedFile object again.
 
786
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
787
 
 
788
 
 
789
class TestReadonlyHttpMixin(object):
 
790
 
 
791
    def test_readonly_http_works(self):
 
792
        # we should be able to read from http with a versioned file.
 
793
        vf = self.get_file()
 
794
        vf.add_lines('1', [], ['a\n'])
 
795
        vf.add_lines('2', ['1'], ['b\n', 'a\n'])
 
796
        readonly_vf = self.get_factory()('foo', get_transport(self.get_readonly_url('.')))
 
797
        for version in readonly_vf.versions():
 
798
            readonly_vf.get_lines(version)
 
799
 
 
800
 
 
801
class TestWeaveHTTP(TestCaseWithWebserver, TestReadonlyHttpMixin):
 
802
 
 
803
    def get_file(self):
 
804
        return WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
805
 
 
806
    def get_factory(self):
 
807
        return WeaveFile
 
808
 
 
809
 
 
810
class TestKnitHTTP(TestCaseWithWebserver, TestReadonlyHttpMixin):
 
811
 
 
812
    def get_file(self):
 
813
        return KnitVersionedFile('foo', get_transport(self.get_url('.')),
 
814
                                 delta=True, create=True)
 
815
 
 
816
    def get_factory(self):
 
817
        return KnitVersionedFile