/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-11-29 17:16:58 UTC
  • mto: This revision was merged to the branch mainline in revision 2156.
  • Revision ID: john@arbash-meinel.com-20061129171658-uwphkz5ntsb7bv0r
(Dmitry Vasiliev) pre-lookup encoders to improve performance

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
 
 
20
import difflib
 
21
 
 
22
 
 
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
 
24
from bzrlib.knit import (
 
25
    KnitContent,
 
26
    KnitVersionedFile,
 
27
    KnitPlainFactory,
 
28
    KnitAnnotateFactory,
 
29
    WeaveToKnit)
 
30
from bzrlib.osutils import split_lines
 
31
from bzrlib.tests import TestCase, TestCaseWithTransport
 
32
from bzrlib.transport import TransportLogger, get_transport
 
33
from bzrlib.transport.memory import MemoryTransport
 
34
from bzrlib.weave import Weave
 
35
 
 
36
 
 
37
class KnitContentTests(TestCase):
 
38
 
 
39
    def test_constructor(self):
 
40
        content = KnitContent([])
 
41
 
 
42
    def test_text(self):
 
43
        content = KnitContent([])
 
44
        self.assertEqual(content.text(), [])
 
45
 
 
46
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
47
        self.assertEqual(content.text(), ["text1", "text2"])
 
48
 
 
49
    def test_annotate(self):
 
50
        content = KnitContent([])
 
51
        self.assertEqual(content.annotate(), [])
 
52
 
 
53
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
54
        self.assertEqual(content.annotate(),
 
55
            [("origin1", "text1"), ("origin2", "text2")])
 
56
 
 
57
    def test_annotate_iter(self):
 
58
        content = KnitContent([])
 
59
        it = content.annotate_iter()
 
60
        self.assertRaises(StopIteration, it.next)
 
61
 
 
62
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
63
        it = content.annotate_iter()
 
64
        self.assertEqual(it.next(), ("origin1", "text1"))
 
65
        self.assertEqual(it.next(), ("origin2", "text2"))
 
66
        self.assertRaises(StopIteration, it.next)
 
67
 
 
68
    def test_copy(self):
 
69
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
70
        copy = content.copy()
 
71
        self.assertIsInstance(copy, KnitContent)
 
72
        self.assertEqual(copy.annotate(),
 
73
            [("origin1", "text1"), ("origin2", "text2")])
 
74
 
 
75
    def test_line_delta(self):
 
76
        content1 = KnitContent([("", "a"), ("", "b")])
 
77
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
78
        self.assertEqual(content1.line_delta(content2),
 
79
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
80
 
 
81
    def test_line_delta_iter(self):
 
82
        content1 = KnitContent([("", "a"), ("", "b")])
 
83
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
84
        it = content1.line_delta_iter(content2)
 
85
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
86
        self.assertRaises(StopIteration, it.next)
 
87
 
 
88
class KnitTests(TestCaseWithTransport):
 
89
    """Class containing knit test helper routines."""
 
90
 
 
91
    def make_test_knit(self, annotate=False, delay_create=False):
 
92
        if not annotate:
 
93
            factory = KnitPlainFactory()
 
94
        else:
 
95
            factory = None
 
96
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
97
                                 factory=factory, create=True,
 
98
                                 delay_create=delay_create)
 
99
 
 
100
 
 
101
class BasicKnitTests(KnitTests):
 
102
 
 
103
    def add_stock_one_and_one_a(self, k):
 
104
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
105
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
106
 
 
107
    def test_knit_constructor(self):
 
108
        """Construct empty k"""
 
109
        self.make_test_knit()
 
110
 
 
111
    def test_knit_add(self):
 
112
        """Store one text in knit and retrieve"""
 
113
        k = self.make_test_knit()
 
114
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
115
        self.assertTrue(k.has_version('text-1'))
 
116
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
117
 
 
118
    def test_knit_reload(self):
 
119
        # test that the content in a reloaded knit is correct
 
120
        k = self.make_test_knit()
 
121
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
122
        del k
 
123
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
124
        self.assertTrue(k2.has_version('text-1'))
 
125
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
126
 
 
127
    def test_knit_several(self):
 
128
        """Store several texts in a knit"""
 
129
        k = self.make_test_knit()
 
130
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
131
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
132
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
133
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
134
        
 
135
    def test_repeated_add(self):
 
136
        """Knit traps attempt to replace existing version"""
 
137
        k = self.make_test_knit()
 
138
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
139
        self.assertRaises(RevisionAlreadyPresent, 
 
140
                k.add_lines,
 
141
                'text-1', [], split_lines(TEXT_1))
 
142
 
 
143
    def test_empty(self):
 
144
        k = self.make_test_knit(True)
 
145
        k.add_lines('text-1', [], [])
 
146
        self.assertEquals(k.get_lines('text-1'), [])
 
147
 
 
148
    def test_incomplete(self):
 
149
        """Test if texts without a ending line-end can be inserted and
 
150
        extracted."""
 
151
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
152
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
153
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
154
        # reopening ensures maximum room for confusion
 
155
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
156
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
157
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
158
 
 
159
    def test_delta(self):
 
160
        """Expression of knit delta as lines"""
 
161
        k = self.make_test_knit()
 
162
        td = list(line_delta(TEXT_1.splitlines(True),
 
163
                             TEXT_1A.splitlines(True)))
 
164
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
165
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
166
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
167
 
 
168
    def test_add_with_parents(self):
 
169
        """Store in knit with parents"""
 
170
        k = self.make_test_knit()
 
171
        self.add_stock_one_and_one_a(k)
 
172
        self.assertEquals(k.get_parents('text-1'), [])
 
173
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
174
 
 
175
    def test_ancestry(self):
 
176
        """Store in knit with parents"""
 
177
        k = self.make_test_knit()
 
178
        self.add_stock_one_and_one_a(k)
 
179
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
180
 
 
181
    def test_add_delta(self):
 
182
        """Store in knit with parents"""
 
183
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
184
            delta=True, create=True)
 
185
        self.add_stock_one_and_one_a(k)
 
186
        k.clear_cache()
 
187
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
188
 
 
189
    def test_annotate(self):
 
190
        """Annotations"""
 
191
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
192
            delta=True, create=True)
 
193
        self.insert_and_test_small_annotate(k)
 
194
 
 
195
    def insert_and_test_small_annotate(self, k):
 
196
        """test annotation with k works correctly."""
 
197
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
198
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
199
 
 
200
        origins = k.annotate('text-2')
 
201
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
202
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
203
 
 
204
    def test_annotate_fulltext(self):
 
205
        """Annotations"""
 
206
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
207
            delta=False, create=True)
 
208
        self.insert_and_test_small_annotate(k)
 
209
 
 
210
    def test_annotate_merge_1(self):
 
211
        k = self.make_test_knit(True)
 
212
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
213
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
214
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
215
        origins = k.annotate('text-am')
 
216
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
217
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
218
 
 
219
    def test_annotate_merge_2(self):
 
220
        k = self.make_test_knit(True)
 
221
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
222
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
223
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
224
        origins = k.annotate('text-am')
 
225
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
226
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
227
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
228
 
 
229
    def test_annotate_merge_9(self):
 
230
        k = self.make_test_knit(True)
 
231
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
232
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
233
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
234
        origins = k.annotate('text-am')
 
235
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
236
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
237
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
238
 
 
239
    def test_annotate_merge_3(self):
 
240
        k = self.make_test_knit(True)
 
241
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
242
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
243
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
244
        origins = k.annotate('text-am')
 
245
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
246
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
247
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
248
 
 
249
    def test_annotate_merge_4(self):
 
250
        k = self.make_test_knit(True)
 
251
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
252
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
253
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
254
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
255
        origins = k.annotate('text-am')
 
256
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
257
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
258
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
259
 
 
260
    def test_annotate_merge_5(self):
 
261
        k = self.make_test_knit(True)
 
262
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
263
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
264
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
265
        k.add_lines('text-am',
 
266
                    ['text-a1', 'text-a2', 'text-a3'],
 
267
                    ['a\n', 'e\n', 'z\n'])
 
268
        origins = k.annotate('text-am')
 
269
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
270
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
271
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
272
 
 
273
    def test_annotate_file_cherry_pick(self):
 
274
        k = self.make_test_knit(True)
 
275
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
276
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
277
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
278
        origins = k.annotate('text-3')
 
279
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
280
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
281
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
282
 
 
283
    def test_knit_join(self):
 
284
        """Store in knit with parents"""
 
285
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
286
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
287
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
288
 
 
289
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
290
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
291
 
 
292
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
293
 
 
294
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
295
        count = k2.join(k1, version_ids=['text-m'])
 
296
        self.assertEquals(count, 5)
 
297
        self.assertTrue(k2.has_version('text-a'))
 
298
        self.assertTrue(k2.has_version('text-c'))
 
299
 
 
300
    def test_reannotate(self):
 
301
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
302
                               factory=KnitAnnotateFactory(), create=True)
 
303
        # 0
 
304
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
305
        # 1
 
306
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
307
 
 
308
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
309
                               factory=KnitAnnotateFactory(), create=True)
 
310
        k2.join(k1, version_ids=['text-b'])
 
311
 
 
312
        # 2
 
313
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
314
        # 2
 
315
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
316
        # 3
 
317
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
318
 
 
319
        # test-c will have index 3
 
320
        k1.join(k2, version_ids=['text-c'])
 
321
 
 
322
        lines = k1.get_lines('text-c')
 
323
        self.assertEquals(lines, ['z\n', 'c\n'])
 
324
 
 
325
        origins = k1.annotate('text-c')
 
326
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
327
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
328
 
 
329
    def test_get_line_delta_texts(self):
 
330
        """Make sure we can call get_texts on text with reused line deltas"""
 
331
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
332
                               factory=KnitPlainFactory(), create=True)
 
333
        for t in range(3):
 
334
            if t == 0:
 
335
                parents = []
 
336
            else:
 
337
                parents = ['%d' % (t-1)]
 
338
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
339
        k1.get_texts(('%d' % t) for t in range(3))
 
340
        
 
341
    def test_iter_lines_reads_in_order(self):
 
342
        t = MemoryTransport()
 
343
        instrumented_t = TransportLogger(t)
 
344
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
345
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
346
        # add texts with no required ordering
 
347
        k1.add_lines('base', [], ['text\n'])
 
348
        k1.add_lines('base2', [], ['text2\n'])
 
349
        k1.clear_cache()
 
350
        instrumented_t._calls = []
 
351
        # request a last-first iteration
 
352
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
353
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
354
        self.assertEqual(['text\n', 'text2\n'], results)
 
355
 
 
356
    def test_create_empty_annotated(self):
 
357
        k1 = self.make_test_knit(True)
 
358
        # 0
 
359
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
360
        k2 = k1.create_empty('t', MemoryTransport())
 
361
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
362
        self.assertEqual(k1.delta, k2.delta)
 
363
        # the generic test checks for empty content and file class
 
364
 
 
365
    def test_knit_format(self):
 
366
        # this tests that a new knit index file has the expected content
 
367
        # and that is writes the data we expect as records are added.
 
368
        knit = self.make_test_knit(True)
 
369
        # Now knit files are not created until we first add data to them
 
370
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
371
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
372
        self.assertFileEqual(
 
373
            "# bzr knit index 8\n"
 
374
            "\n"
 
375
            "revid fulltext 0 84 .a_ghost :",
 
376
            'test.kndx')
 
377
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
378
        self.assertFileEqual(
 
379
            "# bzr knit index 8\n"
 
380
            "\nrevid fulltext 0 84 .a_ghost :"
 
381
            "\nrevid2 line-delta 84 82 0 :",
 
382
            'test.kndx')
 
383
        # we should be able to load this file again
 
384
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
385
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
386
        # write a short write to the file and ensure that its ignored
 
387
        indexfile = file('test.kndx', 'at')
 
388
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
389
        indexfile.close()
 
390
        # we should be able to load this file again
 
391
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
392
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
393
        # and add a revision with the same id the failed write had
 
394
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
395
        # and when reading it revid3 should now appear.
 
396
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
397
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
398
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
399
 
 
400
    def test_delay_create(self):
 
401
        """Test that passing delay_create=True creates files late"""
 
402
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
403
        self.failIfExists('test.knit')
 
404
        self.failIfExists('test.kndx')
 
405
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
406
        self.failUnlessExists('test.knit')
 
407
        self.assertFileEqual(
 
408
            "# bzr knit index 8\n"
 
409
            "\n"
 
410
            "revid fulltext 0 84 .a_ghost :",
 
411
            'test.kndx')
 
412
 
 
413
    def test_create_parent_dir(self):
 
414
        """create_parent_dir can create knits in nonexistant dirs"""
 
415
        # Has no effect if we don't set 'delay_create'
 
416
        trans = get_transport('.')
 
417
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
418
                          trans, access_mode='w', factory=None,
 
419
                          create=True, create_parent_dir=True)
 
420
        # Nothing should have changed yet
 
421
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
422
                                 factory=None, create=True,
 
423
                                 create_parent_dir=True,
 
424
                                 delay_create=True)
 
425
        self.failIfExists('dir/test.knit')
 
426
        self.failIfExists('dir/test.kndx')
 
427
        self.failIfExists('dir')
 
428
        knit.add_lines('revid', [], ['a\n'])
 
429
        self.failUnlessExists('dir')
 
430
        self.failUnlessExists('dir/test.knit')
 
431
        self.assertFileEqual(
 
432
            "# bzr knit index 8\n"
 
433
            "\n"
 
434
            "revid fulltext 0 84  :",
 
435
            'dir/test.kndx')
 
436
 
 
437
    def test_create_mode_700(self):
 
438
        trans = get_transport('.')
 
439
        if not trans._can_roundtrip_unix_modebits():
 
440
            # Can't roundtrip, so no need to run this test
 
441
            return
 
442
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
443
                                 factory=None, create=True,
 
444
                                 create_parent_dir=True,
 
445
                                 delay_create=True,
 
446
                                 file_mode=0600,
 
447
                                 dir_mode=0700)
 
448
        knit.add_lines('revid', [], ['a\n'])
 
449
        self.assertTransportMode(trans, 'dir', 0700)
 
450
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
451
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
452
 
 
453
    def test_create_mode_770(self):
 
454
        trans = get_transport('.')
 
455
        if not trans._can_roundtrip_unix_modebits():
 
456
            # Can't roundtrip, so no need to run this test
 
457
            return
 
458
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
459
                                 factory=None, create=True,
 
460
                                 create_parent_dir=True,
 
461
                                 delay_create=True,
 
462
                                 file_mode=0660,
 
463
                                 dir_mode=0770)
 
464
        knit.add_lines('revid', [], ['a\n'])
 
465
        self.assertTransportMode(trans, 'dir', 0770)
 
466
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
467
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
468
 
 
469
    def test_create_mode_777(self):
 
470
        trans = get_transport('.')
 
471
        if not trans._can_roundtrip_unix_modebits():
 
472
            # Can't roundtrip, so no need to run this test
 
473
            return
 
474
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
475
                                 factory=None, create=True,
 
476
                                 create_parent_dir=True,
 
477
                                 delay_create=True,
 
478
                                 file_mode=0666,
 
479
                                 dir_mode=0777)
 
480
        knit.add_lines('revid', [], ['a\n'])
 
481
        self.assertTransportMode(trans, 'dir', 0777)
 
482
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
483
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
484
 
 
485
    def test_plan_merge(self):
 
486
        my_knit = self.make_test_knit(annotate=True)
 
487
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
488
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
489
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
490
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
491
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
492
            self.assertEqual(plan_line, expected_line)
 
493
 
 
494
 
 
495
TEXT_1 = """\
 
496
Banana cup cakes:
 
497
 
 
498
- bananas
 
499
- eggs
 
500
- broken tea cups
 
501
"""
 
502
 
 
503
TEXT_1A = """\
 
504
Banana cup cake recipe
 
505
(serves 6)
 
506
 
 
507
- bananas
 
508
- eggs
 
509
- broken tea cups
 
510
- self-raising flour
 
511
"""
 
512
 
 
513
TEXT_1B = """\
 
514
Banana cup cake recipe
 
515
 
 
516
- bananas (do not use plantains!!!)
 
517
- broken tea cups
 
518
- flour
 
519
"""
 
520
 
 
521
delta_1_1a = """\
 
522
0,1,2
 
523
Banana cup cake recipe
 
524
(serves 6)
 
525
5,5,1
 
526
- self-raising flour
 
527
"""
 
528
 
 
529
TEXT_2 = """\
 
530
Boeuf bourguignon
 
531
 
 
532
- beef
 
533
- red wine
 
534
- small onions
 
535
- carrot
 
536
- mushrooms
 
537
"""
 
538
 
 
539
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
540
new-a|(serves 6)
 
541
unchanged|
 
542
killed-b|- bananas
 
543
killed-b|- eggs
 
544
new-b|- bananas (do not use plantains!!!)
 
545
unchanged|- broken tea cups
 
546
new-a|- self-raising flour
 
547
new-b|- flour
 
548
"""
 
549
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
550
 
 
551
 
 
552
def line_delta(from_lines, to_lines):
 
553
    """Generate line-based delta from one text to another"""
 
554
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
555
    for op in s.get_opcodes():
 
556
        if op[0] == 'equal':
 
557
            continue
 
558
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
559
        for i in range(op[3], op[4]):
 
560
            yield to_lines[i]
 
561
 
 
562
 
 
563
def apply_line_delta(basis_lines, delta_lines):
 
564
    """Apply a line-based perfect diff
 
565
    
 
566
    basis_lines -- text to apply the patch to
 
567
    delta_lines -- diff instructions and content
 
568
    """
 
569
    out = basis_lines[:]
 
570
    i = 0
 
571
    offset = 0
 
572
    while i < len(delta_lines):
 
573
        l = delta_lines[i]
 
574
        a, b, c = map(long, l.split(','))
 
575
        i = i + 1
 
576
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
577
        i = i + c
 
578
        offset = offset + (b - a) + c
 
579
    return out
 
580
 
 
581
 
 
582
class TestWeaveToKnit(KnitTests):
 
583
 
 
584
    def test_weave_to_knit_matches(self):
 
585
        # check that the WeaveToKnit is_compatible function
 
586
        # registers True for a Weave to a Knit.
 
587
        w = Weave()
 
588
        k = self.make_test_knit()
 
589
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
590
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
591
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
592
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
593
 
 
594
 
 
595
class TestKnitCaching(KnitTests):
 
596
    
 
597
    def create_knit(self, cache_add=False):
 
598
        k = self.make_test_knit(True)
 
599
        if cache_add:
 
600
            k.enable_cache()
 
601
 
 
602
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
603
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
604
        return k
 
605
 
 
606
    def test_no_caching(self):
 
607
        k = self.create_knit()
 
608
        # Nothing should be cached without setting 'enable_cache'
 
609
        self.assertEqual({}, k._data._cache)
 
610
 
 
611
    def test_cache_add_and_clear(self):
 
612
        k = self.create_knit(True)
 
613
 
 
614
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
615
 
 
616
        k.clear_cache()
 
617
        self.assertEqual({}, k._data._cache)
 
618
 
 
619
    def test_cache_data_read_raw(self):
 
620
        k = self.create_knit()
 
621
 
 
622
        # Now cache and read
 
623
        k.enable_cache()
 
624
 
 
625
        def read_one_raw(version):
 
626
            pos_map = k._get_components_positions([version])
 
627
            method, pos, size, next = pos_map[version]
 
628
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
629
            self.assertEqual(1, len(lst))
 
630
            return lst[0]
 
631
 
 
632
        val = read_one_raw('text-1')
 
633
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
634
 
 
635
        k.clear_cache()
 
636
        # After clear, new reads are not cached
 
637
        self.assertEqual({}, k._data._cache)
 
638
 
 
639
        val2 = read_one_raw('text-1')
 
640
        self.assertEqual(val, val2)
 
641
        self.assertEqual({}, k._data._cache)
 
642
 
 
643
    def test_cache_data_read(self):
 
644
        k = self.create_knit()
 
645
 
 
646
        def read_one(version):
 
647
            pos_map = k._get_components_positions([version])
 
648
            method, pos, size, next = pos_map[version]
 
649
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
650
            self.assertEqual(1, len(lst))
 
651
            return lst[0]
 
652
 
 
653
        # Now cache and read
 
654
        k.enable_cache()
 
655
 
 
656
        val = read_one('text-2')
 
657
        self.assertEqual(['text-2'], k._data._cache.keys())
 
658
        self.assertEqual('text-2', val[0])
 
659
        content, digest = k._data._parse_record('text-2',
 
660
                                                k._data._cache['text-2'])
 
661
        self.assertEqual(content, val[1])
 
662
        self.assertEqual(digest, val[2])
 
663
 
 
664
        k.clear_cache()
 
665
        self.assertEqual({}, k._data._cache)
 
666
 
 
667
        val2 = read_one('text-2')
 
668
        self.assertEqual(val, val2)
 
669
        self.assertEqual({}, k._data._cache)
 
670
 
 
671
    def test_cache_read(self):
 
672
        k = self.create_knit()
 
673
        k.enable_cache()
 
674
 
 
675
        text = k.get_text('text-1')
 
676
        self.assertEqual(TEXT_1, text)
 
677
        self.assertEqual(['text-1'], k._data._cache.keys())
 
678
 
 
679
        k.clear_cache()
 
680
        self.assertEqual({}, k._data._cache)
 
681
 
 
682
        text = k.get_text('text-1')
 
683
        self.assertEqual(TEXT_1, text)
 
684
        self.assertEqual({}, k._data._cache)
 
685
 
 
686
 
 
687
class TestKnitIndex(KnitTests):
 
688
 
 
689
    def test_add_versions_dictionary_compresses(self):
 
690
        """Adding versions to the index should update the lookup dict"""
 
691
        knit = self.make_test_knit()
 
692
        idx = knit._index
 
693
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
694
        self.check_file_contents('test.kndx',
 
695
            '# bzr knit index 8\n'
 
696
            '\n'
 
697
            'a-1 fulltext 0 0  :'
 
698
            )
 
699
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
700
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
701
                         ])
 
702
        self.check_file_contents('test.kndx',
 
703
            '# bzr knit index 8\n'
 
704
            '\n'
 
705
            'a-1 fulltext 0 0  :\n'
 
706
            'a-2 fulltext 0 0 0 :\n'
 
707
            'a-3 fulltext 0 0 1 :'
 
708
            )
 
709
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
710
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
711
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
712
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
713
                         }, idx._cache)
 
714
 
 
715
    def test_add_versions_fails_clean(self):
 
716
        """If add_versions fails in the middle, it restores a pristine state.
 
717
 
 
718
        Any modifications that are made to the index are reset if all versions
 
719
        cannot be added.
 
720
        """
 
721
        # This cheats a little bit by passing in a generator which will
 
722
        # raise an exception before the processing finishes
 
723
        # Other possibilities would be to have an version with the wrong number
 
724
        # of entries, or to make the backing transport unable to write any
 
725
        # files.
 
726
 
 
727
        knit = self.make_test_knit()
 
728
        idx = knit._index
 
729
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
730
 
 
731
        class StopEarly(Exception):
 
732
            pass
 
733
 
 
734
        def generate_failure():
 
735
            """Add some entries and then raise an exception"""
 
736
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
737
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
738
            raise StopEarly()
 
739
 
 
740
        # Assert the pre-condition
 
741
        self.assertEqual(['a-1'], idx._history)
 
742
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
743
 
 
744
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
745
 
 
746
        # And it shouldn't be modified
 
747
        self.assertEqual(['a-1'], idx._history)
 
748
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)