/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-12-18 15:37:29 UTC
  • mto: This revision was merged to the branch mainline in revision 2195.
  • Revision ID: john@arbash-meinel.com-20061218153729-se570hwn14rpaqbw
Correct the bzr+http:// information, since it wasn't in 0.12, but will be in 0.14

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
 
 
20
import difflib
 
21
 
 
22
 
 
23
from bzrlib import errors
 
24
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
 
25
from bzrlib.knit import (
 
26
    KnitContent,
 
27
    KnitVersionedFile,
 
28
    KnitPlainFactory,
 
29
    KnitAnnotateFactory,
 
30
    WeaveToKnit)
 
31
from bzrlib.osutils import split_lines
 
32
from bzrlib.tests import TestCase, TestCaseWithTransport
 
33
from bzrlib.transport import TransportLogger, get_transport
 
34
from bzrlib.transport.memory import MemoryTransport
 
35
from bzrlib.weave import Weave
 
36
 
 
37
 
 
38
class KnitContentTests(TestCase):
 
39
 
 
40
    def test_constructor(self):
 
41
        content = KnitContent([])
 
42
 
 
43
    def test_text(self):
 
44
        content = KnitContent([])
 
45
        self.assertEqual(content.text(), [])
 
46
 
 
47
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
48
        self.assertEqual(content.text(), ["text1", "text2"])
 
49
 
 
50
    def test_annotate(self):
 
51
        content = KnitContent([])
 
52
        self.assertEqual(content.annotate(), [])
 
53
 
 
54
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
55
        self.assertEqual(content.annotate(),
 
56
            [("origin1", "text1"), ("origin2", "text2")])
 
57
 
 
58
    def test_annotate_iter(self):
 
59
        content = KnitContent([])
 
60
        it = content.annotate_iter()
 
61
        self.assertRaises(StopIteration, it.next)
 
62
 
 
63
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
64
        it = content.annotate_iter()
 
65
        self.assertEqual(it.next(), ("origin1", "text1"))
 
66
        self.assertEqual(it.next(), ("origin2", "text2"))
 
67
        self.assertRaises(StopIteration, it.next)
 
68
 
 
69
    def test_copy(self):
 
70
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
71
        copy = content.copy()
 
72
        self.assertIsInstance(copy, KnitContent)
 
73
        self.assertEqual(copy.annotate(),
 
74
            [("origin1", "text1"), ("origin2", "text2")])
 
75
 
 
76
    def test_line_delta(self):
 
77
        content1 = KnitContent([("", "a"), ("", "b")])
 
78
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
79
        self.assertEqual(content1.line_delta(content2),
 
80
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
81
 
 
82
    def test_line_delta_iter(self):
 
83
        content1 = KnitContent([("", "a"), ("", "b")])
 
84
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
85
        it = content1.line_delta_iter(content2)
 
86
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
87
        self.assertRaises(StopIteration, it.next)
 
88
 
 
89
 
 
90
class KnitTests(TestCaseWithTransport):
 
91
    """Class containing knit test helper routines."""
 
92
 
 
93
    def make_test_knit(self, annotate=False, delay_create=False):
 
94
        if not annotate:
 
95
            factory = KnitPlainFactory()
 
96
        else:
 
97
            factory = None
 
98
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
99
                                 factory=factory, create=True,
 
100
                                 delay_create=delay_create)
 
101
 
 
102
 
 
103
class BasicKnitTests(KnitTests):
 
104
 
 
105
    def add_stock_one_and_one_a(self, k):
 
106
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
107
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
108
 
 
109
    def test_knit_constructor(self):
 
110
        """Construct empty k"""
 
111
        self.make_test_knit()
 
112
 
 
113
    def test_knit_add(self):
 
114
        """Store one text in knit and retrieve"""
 
115
        k = self.make_test_knit()
 
116
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
117
        self.assertTrue(k.has_version('text-1'))
 
118
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
119
 
 
120
    def test_knit_reload(self):
 
121
        # test that the content in a reloaded knit is correct
 
122
        k = self.make_test_knit()
 
123
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
124
        del k
 
125
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
126
        self.assertTrue(k2.has_version('text-1'))
 
127
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
128
 
 
129
    def test_knit_several(self):
 
130
        """Store several texts in a knit"""
 
131
        k = self.make_test_knit()
 
132
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
133
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
134
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
135
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
136
        
 
137
    def test_repeated_add(self):
 
138
        """Knit traps attempt to replace existing version"""
 
139
        k = self.make_test_knit()
 
140
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
141
        self.assertRaises(RevisionAlreadyPresent, 
 
142
                k.add_lines,
 
143
                'text-1', [], split_lines(TEXT_1))
 
144
 
 
145
    def test_empty(self):
 
146
        k = self.make_test_knit(True)
 
147
        k.add_lines('text-1', [], [])
 
148
        self.assertEquals(k.get_lines('text-1'), [])
 
149
 
 
150
    def test_incomplete(self):
 
151
        """Test if texts without a ending line-end can be inserted and
 
152
        extracted."""
 
153
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
154
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
155
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
156
        # reopening ensures maximum room for confusion
 
157
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
158
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
159
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
160
 
 
161
    def test_delta(self):
 
162
        """Expression of knit delta as lines"""
 
163
        k = self.make_test_knit()
 
164
        td = list(line_delta(TEXT_1.splitlines(True),
 
165
                             TEXT_1A.splitlines(True)))
 
166
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
167
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
168
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
169
 
 
170
    def test_add_with_parents(self):
 
171
        """Store in knit with parents"""
 
172
        k = self.make_test_knit()
 
173
        self.add_stock_one_and_one_a(k)
 
174
        self.assertEquals(k.get_parents('text-1'), [])
 
175
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
176
 
 
177
    def test_ancestry(self):
 
178
        """Store in knit with parents"""
 
179
        k = self.make_test_knit()
 
180
        self.add_stock_one_and_one_a(k)
 
181
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
182
 
 
183
    def test_add_delta(self):
 
184
        """Store in knit with parents"""
 
185
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
186
            delta=True, create=True)
 
187
        self.add_stock_one_and_one_a(k)
 
188
        k.clear_cache()
 
189
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
190
 
 
191
    def test_annotate(self):
 
192
        """Annotations"""
 
193
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
194
            delta=True, create=True)
 
195
        self.insert_and_test_small_annotate(k)
 
196
 
 
197
    def insert_and_test_small_annotate(self, k):
 
198
        """test annotation with k works correctly."""
 
199
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
200
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
201
 
 
202
        origins = k.annotate('text-2')
 
203
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
204
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
205
 
 
206
    def test_annotate_fulltext(self):
 
207
        """Annotations"""
 
208
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
209
            delta=False, create=True)
 
210
        self.insert_and_test_small_annotate(k)
 
211
 
 
212
    def test_annotate_merge_1(self):
 
213
        k = self.make_test_knit(True)
 
214
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
215
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
216
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
217
        origins = k.annotate('text-am')
 
218
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
219
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
220
 
 
221
    def test_annotate_merge_2(self):
 
222
        k = self.make_test_knit(True)
 
223
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
224
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
225
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
226
        origins = k.annotate('text-am')
 
227
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
228
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
229
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
230
 
 
231
    def test_annotate_merge_9(self):
 
232
        k = self.make_test_knit(True)
 
233
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
234
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
235
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
236
        origins = k.annotate('text-am')
 
237
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
238
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
239
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
240
 
 
241
    def test_annotate_merge_3(self):
 
242
        k = self.make_test_knit(True)
 
243
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
244
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
245
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
246
        origins = k.annotate('text-am')
 
247
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
248
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
249
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
250
 
 
251
    def test_annotate_merge_4(self):
 
252
        k = self.make_test_knit(True)
 
253
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
254
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
255
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
256
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
257
        origins = k.annotate('text-am')
 
258
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
259
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
260
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
261
 
 
262
    def test_annotate_merge_5(self):
 
263
        k = self.make_test_knit(True)
 
264
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
265
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
266
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
267
        k.add_lines('text-am',
 
268
                    ['text-a1', 'text-a2', 'text-a3'],
 
269
                    ['a\n', 'e\n', 'z\n'])
 
270
        origins = k.annotate('text-am')
 
271
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
272
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
273
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
274
 
 
275
    def test_annotate_file_cherry_pick(self):
 
276
        k = self.make_test_knit(True)
 
277
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
278
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
279
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
280
        origins = k.annotate('text-3')
 
281
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
282
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
283
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
284
 
 
285
    def test_knit_join(self):
 
286
        """Store in knit with parents"""
 
287
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
288
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
289
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
290
 
 
291
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
292
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
293
 
 
294
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
295
 
 
296
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
297
        count = k2.join(k1, version_ids=['text-m'])
 
298
        self.assertEquals(count, 5)
 
299
        self.assertTrue(k2.has_version('text-a'))
 
300
        self.assertTrue(k2.has_version('text-c'))
 
301
 
 
302
    def test_reannotate(self):
 
303
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
304
                               factory=KnitAnnotateFactory(), create=True)
 
305
        # 0
 
306
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
307
        # 1
 
308
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
309
 
 
310
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
311
                               factory=KnitAnnotateFactory(), create=True)
 
312
        k2.join(k1, version_ids=['text-b'])
 
313
 
 
314
        # 2
 
315
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
316
        # 2
 
317
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
318
        # 3
 
319
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
320
 
 
321
        # test-c will have index 3
 
322
        k1.join(k2, version_ids=['text-c'])
 
323
 
 
324
        lines = k1.get_lines('text-c')
 
325
        self.assertEquals(lines, ['z\n', 'c\n'])
 
326
 
 
327
        origins = k1.annotate('text-c')
 
328
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
329
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
330
 
 
331
    def test_get_line_delta_texts(self):
 
332
        """Make sure we can call get_texts on text with reused line deltas"""
 
333
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
334
                               factory=KnitPlainFactory(), create=True)
 
335
        for t in range(3):
 
336
            if t == 0:
 
337
                parents = []
 
338
            else:
 
339
                parents = ['%d' % (t-1)]
 
340
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
341
        k1.get_texts(('%d' % t) for t in range(3))
 
342
        
 
343
    def test_iter_lines_reads_in_order(self):
 
344
        t = MemoryTransport()
 
345
        instrumented_t = TransportLogger(t)
 
346
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
347
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
348
        # add texts with no required ordering
 
349
        k1.add_lines('base', [], ['text\n'])
 
350
        k1.add_lines('base2', [], ['text2\n'])
 
351
        k1.clear_cache()
 
352
        instrumented_t._calls = []
 
353
        # request a last-first iteration
 
354
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
355
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
356
        self.assertEqual(['text\n', 'text2\n'], results)
 
357
 
 
358
    def test_create_empty_annotated(self):
 
359
        k1 = self.make_test_knit(True)
 
360
        # 0
 
361
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
362
        k2 = k1.create_empty('t', MemoryTransport())
 
363
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
364
        self.assertEqual(k1.delta, k2.delta)
 
365
        # the generic test checks for empty content and file class
 
366
 
 
367
    def test_knit_format(self):
 
368
        # this tests that a new knit index file has the expected content
 
369
        # and that is writes the data we expect as records are added.
 
370
        knit = self.make_test_knit(True)
 
371
        # Now knit files are not created until we first add data to them
 
372
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
373
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
374
        self.assertFileEqual(
 
375
            "# bzr knit index 8\n"
 
376
            "\n"
 
377
            "revid fulltext 0 84 .a_ghost :",
 
378
            'test.kndx')
 
379
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
380
        self.assertFileEqual(
 
381
            "# bzr knit index 8\n"
 
382
            "\nrevid fulltext 0 84 .a_ghost :"
 
383
            "\nrevid2 line-delta 84 82 0 :",
 
384
            'test.kndx')
 
385
        # we should be able to load this file again
 
386
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
387
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
388
        # write a short write to the file and ensure that its ignored
 
389
        indexfile = file('test.kndx', 'at')
 
390
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
391
        indexfile.close()
 
392
        # we should be able to load this file again
 
393
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
394
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
395
        # and add a revision with the same id the failed write had
 
396
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
397
        # and when reading it revid3 should now appear.
 
398
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
399
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
400
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
401
 
 
402
    def test_delay_create(self):
 
403
        """Test that passing delay_create=True creates files late"""
 
404
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
405
        self.failIfExists('test.knit')
 
406
        self.failIfExists('test.kndx')
 
407
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
408
        self.failUnlessExists('test.knit')
 
409
        self.assertFileEqual(
 
410
            "# bzr knit index 8\n"
 
411
            "\n"
 
412
            "revid fulltext 0 84 .a_ghost :",
 
413
            'test.kndx')
 
414
 
 
415
    def test_create_parent_dir(self):
 
416
        """create_parent_dir can create knits in nonexistant dirs"""
 
417
        # Has no effect if we don't set 'delay_create'
 
418
        trans = get_transport('.')
 
419
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
420
                          trans, access_mode='w', factory=None,
 
421
                          create=True, create_parent_dir=True)
 
422
        # Nothing should have changed yet
 
423
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
424
                                 factory=None, create=True,
 
425
                                 create_parent_dir=True,
 
426
                                 delay_create=True)
 
427
        self.failIfExists('dir/test.knit')
 
428
        self.failIfExists('dir/test.kndx')
 
429
        self.failIfExists('dir')
 
430
        knit.add_lines('revid', [], ['a\n'])
 
431
        self.failUnlessExists('dir')
 
432
        self.failUnlessExists('dir/test.knit')
 
433
        self.assertFileEqual(
 
434
            "# bzr knit index 8\n"
 
435
            "\n"
 
436
            "revid fulltext 0 84  :",
 
437
            'dir/test.kndx')
 
438
 
 
439
    def test_create_mode_700(self):
 
440
        trans = get_transport('.')
 
441
        if not trans._can_roundtrip_unix_modebits():
 
442
            # Can't roundtrip, so no need to run this test
 
443
            return
 
444
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
445
                                 factory=None, create=True,
 
446
                                 create_parent_dir=True,
 
447
                                 delay_create=True,
 
448
                                 file_mode=0600,
 
449
                                 dir_mode=0700)
 
450
        knit.add_lines('revid', [], ['a\n'])
 
451
        self.assertTransportMode(trans, 'dir', 0700)
 
452
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
453
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
454
 
 
455
    def test_create_mode_770(self):
 
456
        trans = get_transport('.')
 
457
        if not trans._can_roundtrip_unix_modebits():
 
458
            # Can't roundtrip, so no need to run this test
 
459
            return
 
460
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
461
                                 factory=None, create=True,
 
462
                                 create_parent_dir=True,
 
463
                                 delay_create=True,
 
464
                                 file_mode=0660,
 
465
                                 dir_mode=0770)
 
466
        knit.add_lines('revid', [], ['a\n'])
 
467
        self.assertTransportMode(trans, 'dir', 0770)
 
468
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
469
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
470
 
 
471
    def test_create_mode_777(self):
 
472
        trans = get_transport('.')
 
473
        if not trans._can_roundtrip_unix_modebits():
 
474
            # Can't roundtrip, so no need to run this test
 
475
            return
 
476
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
477
                                 factory=None, create=True,
 
478
                                 create_parent_dir=True,
 
479
                                 delay_create=True,
 
480
                                 file_mode=0666,
 
481
                                 dir_mode=0777)
 
482
        knit.add_lines('revid', [], ['a\n'])
 
483
        self.assertTransportMode(trans, 'dir', 0777)
 
484
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
485
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
486
 
 
487
    def test_plan_merge(self):
 
488
        my_knit = self.make_test_knit(annotate=True)
 
489
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
490
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
491
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
492
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
493
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
494
            self.assertEqual(plan_line, expected_line)
 
495
 
 
496
 
 
497
TEXT_1 = """\
 
498
Banana cup cakes:
 
499
 
 
500
- bananas
 
501
- eggs
 
502
- broken tea cups
 
503
"""
 
504
 
 
505
TEXT_1A = """\
 
506
Banana cup cake recipe
 
507
(serves 6)
 
508
 
 
509
- bananas
 
510
- eggs
 
511
- broken tea cups
 
512
- self-raising flour
 
513
"""
 
514
 
 
515
TEXT_1B = """\
 
516
Banana cup cake recipe
 
517
 
 
518
- bananas (do not use plantains!!!)
 
519
- broken tea cups
 
520
- flour
 
521
"""
 
522
 
 
523
delta_1_1a = """\
 
524
0,1,2
 
525
Banana cup cake recipe
 
526
(serves 6)
 
527
5,5,1
 
528
- self-raising flour
 
529
"""
 
530
 
 
531
TEXT_2 = """\
 
532
Boeuf bourguignon
 
533
 
 
534
- beef
 
535
- red wine
 
536
- small onions
 
537
- carrot
 
538
- mushrooms
 
539
"""
 
540
 
 
541
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
542
new-a|(serves 6)
 
543
unchanged|
 
544
killed-b|- bananas
 
545
killed-b|- eggs
 
546
new-b|- bananas (do not use plantains!!!)
 
547
unchanged|- broken tea cups
 
548
new-a|- self-raising flour
 
549
new-b|- flour
 
550
"""
 
551
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
552
 
 
553
 
 
554
def line_delta(from_lines, to_lines):
 
555
    """Generate line-based delta from one text to another"""
 
556
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
557
    for op in s.get_opcodes():
 
558
        if op[0] == 'equal':
 
559
            continue
 
560
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
561
        for i in range(op[3], op[4]):
 
562
            yield to_lines[i]
 
563
 
 
564
 
 
565
def apply_line_delta(basis_lines, delta_lines):
 
566
    """Apply a line-based perfect diff
 
567
    
 
568
    basis_lines -- text to apply the patch to
 
569
    delta_lines -- diff instructions and content
 
570
    """
 
571
    out = basis_lines[:]
 
572
    i = 0
 
573
    offset = 0
 
574
    while i < len(delta_lines):
 
575
        l = delta_lines[i]
 
576
        a, b, c = map(long, l.split(','))
 
577
        i = i + 1
 
578
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
579
        i = i + c
 
580
        offset = offset + (b - a) + c
 
581
    return out
 
582
 
 
583
 
 
584
class TestWeaveToKnit(KnitTests):
 
585
 
 
586
    def test_weave_to_knit_matches(self):
 
587
        # check that the WeaveToKnit is_compatible function
 
588
        # registers True for a Weave to a Knit.
 
589
        w = Weave()
 
590
        k = self.make_test_knit()
 
591
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
592
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
593
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
594
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
595
 
 
596
 
 
597
class TestKnitCaching(KnitTests):
 
598
    
 
599
    def create_knit(self, cache_add=False):
 
600
        k = self.make_test_knit(True)
 
601
        if cache_add:
 
602
            k.enable_cache()
 
603
 
 
604
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
605
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
606
        return k
 
607
 
 
608
    def test_no_caching(self):
 
609
        k = self.create_knit()
 
610
        # Nothing should be cached without setting 'enable_cache'
 
611
        self.assertEqual({}, k._data._cache)
 
612
 
 
613
    def test_cache_add_and_clear(self):
 
614
        k = self.create_knit(True)
 
615
 
 
616
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
617
 
 
618
        k.clear_cache()
 
619
        self.assertEqual({}, k._data._cache)
 
620
 
 
621
    def test_cache_data_read_raw(self):
 
622
        k = self.create_knit()
 
623
 
 
624
        # Now cache and read
 
625
        k.enable_cache()
 
626
 
 
627
        def read_one_raw(version):
 
628
            pos_map = k._get_components_positions([version])
 
629
            method, pos, size, next = pos_map[version]
 
630
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
631
            self.assertEqual(1, len(lst))
 
632
            return lst[0]
 
633
 
 
634
        val = read_one_raw('text-1')
 
635
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
636
 
 
637
        k.clear_cache()
 
638
        # After clear, new reads are not cached
 
639
        self.assertEqual({}, k._data._cache)
 
640
 
 
641
        val2 = read_one_raw('text-1')
 
642
        self.assertEqual(val, val2)
 
643
        self.assertEqual({}, k._data._cache)
 
644
 
 
645
    def test_cache_data_read(self):
 
646
        k = self.create_knit()
 
647
 
 
648
        def read_one(version):
 
649
            pos_map = k._get_components_positions([version])
 
650
            method, pos, size, next = pos_map[version]
 
651
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
652
            self.assertEqual(1, len(lst))
 
653
            return lst[0]
 
654
 
 
655
        # Now cache and read
 
656
        k.enable_cache()
 
657
 
 
658
        val = read_one('text-2')
 
659
        self.assertEqual(['text-2'], k._data._cache.keys())
 
660
        self.assertEqual('text-2', val[0])
 
661
        content, digest = k._data._parse_record('text-2',
 
662
                                                k._data._cache['text-2'])
 
663
        self.assertEqual(content, val[1])
 
664
        self.assertEqual(digest, val[2])
 
665
 
 
666
        k.clear_cache()
 
667
        self.assertEqual({}, k._data._cache)
 
668
 
 
669
        val2 = read_one('text-2')
 
670
        self.assertEqual(val, val2)
 
671
        self.assertEqual({}, k._data._cache)
 
672
 
 
673
    def test_cache_read(self):
 
674
        k = self.create_knit()
 
675
        k.enable_cache()
 
676
 
 
677
        text = k.get_text('text-1')
 
678
        self.assertEqual(TEXT_1, text)
 
679
        self.assertEqual(['text-1'], k._data._cache.keys())
 
680
 
 
681
        k.clear_cache()
 
682
        self.assertEqual({}, k._data._cache)
 
683
 
 
684
        text = k.get_text('text-1')
 
685
        self.assertEqual(TEXT_1, text)
 
686
        self.assertEqual({}, k._data._cache)
 
687
 
 
688
 
 
689
class TestKnitIndex(KnitTests):
 
690
 
 
691
    def test_add_versions_dictionary_compresses(self):
 
692
        """Adding versions to the index should update the lookup dict"""
 
693
        knit = self.make_test_knit()
 
694
        idx = knit._index
 
695
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
696
        self.check_file_contents('test.kndx',
 
697
            '# bzr knit index 8\n'
 
698
            '\n'
 
699
            'a-1 fulltext 0 0  :'
 
700
            )
 
701
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
702
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
703
                         ])
 
704
        self.check_file_contents('test.kndx',
 
705
            '# bzr knit index 8\n'
 
706
            '\n'
 
707
            'a-1 fulltext 0 0  :\n'
 
708
            'a-2 fulltext 0 0 0 :\n'
 
709
            'a-3 fulltext 0 0 1 :'
 
710
            )
 
711
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
712
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
713
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
714
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
715
                         }, idx._cache)
 
716
 
 
717
    def test_add_versions_fails_clean(self):
 
718
        """If add_versions fails in the middle, it restores a pristine state.
 
719
 
 
720
        Any modifications that are made to the index are reset if all versions
 
721
        cannot be added.
 
722
        """
 
723
        # This cheats a little bit by passing in a generator which will
 
724
        # raise an exception before the processing finishes
 
725
        # Other possibilities would be to have an version with the wrong number
 
726
        # of entries, or to make the backing transport unable to write any
 
727
        # files.
 
728
 
 
729
        knit = self.make_test_knit()
 
730
        idx = knit._index
 
731
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
732
 
 
733
        class StopEarly(Exception):
 
734
            pass
 
735
 
 
736
        def generate_failure():
 
737
            """Add some entries and then raise an exception"""
 
738
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
739
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
740
            raise StopEarly()
 
741
 
 
742
        # Assert the pre-condition
 
743
        self.assertEqual(['a-1'], idx._history)
 
744
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
745
 
 
746
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
747
 
 
748
        # And it shouldn't be modified
 
749
        self.assertEqual(['a-1'], idx._history)
 
750
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
751
 
 
752
    def test_knit_index_ignores_empty_files(self):
 
753
        # There was a race condition in older bzr, where a ^C at the right time
 
754
        # could leave an empty .kndx file, which bzr would later claim was a
 
755
        # corrupted file since the header was not present. In reality, the file
 
756
        # just wasn't created, so it should be ignored.
 
757
        t = get_transport('.')
 
758
        t.put_bytes('test.kndx', '')
 
759
 
 
760
        knit = self.make_test_knit()
 
761
 
 
762
    def test_knit_index_checks_header(self):
 
763
        t = get_transport('.')
 
764
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
 
765
 
 
766
        self.assertRaises(errors.KnitHeaderError, self.make_test_knit)