/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: v.ladeuil+lp at free
  • Date: 2006-10-12 14:29:32 UTC
  • mto: (2145.1.1 keepalive)
  • mto: This revision was merged to the branch mainline in revision 2146.
  • Revision ID: v.ladeuil+lp@free.fr-20061012142932-7221fe16d2b48fa3
Shuffle http related test code. Hopefully it ends up at the right place :)

* bzrlib/tests/HttpServer.py: 
New file. bzrlib.tests.ChrootedTestCase use HttpServer. So the
class can't be defined in bzrlib.tests.HTTPUtils because it
creates a circular dependency (bzrlib.tests.HTTPUtils needs to
import bzrlib.tests).

* bzrlib/transport/http/_urllib.py: 
Transfer test server definition to bzrlib.tests.HttpServer. Clean
up imports.

* bzrlib/transport/http/_pycurl.py: 
Transfer test server definition to bzrlib.tests.HttpServer. Clean
up imports.

* bzrlib/transport/http/__init__.py: 
Transfer all test related code to either bzrlib.tests.HttpServer
and bzrlib.tests.HTTPUtils.
Fix all use of TransportNotPossible and InvalidURL by prefixing it
by 'errors.' (this seems to be the preferred way in the rest of
bzr).
Get rid of unused imports.

* bzrlib/tests/test_transport.py:
(ReadonlyDecoratorTransportTest.test_local_parameters,
FakeNFSDecoratorTests.test_http_parameters): Use HttpServer from
bzrlib.tests.HttpServer instead of bzrlib.transport.http.

* bzrlib/tests/test_sftp_transport.py:
(set_test_transport_to_sftp): Use HttpServer from
bzrlib.tests.HttpServer instead of bzrlib.transport.http.

* bzrlib/tests/test_selftest.py:
(TestTestCaseWithTransport.test_get_readonly_url_http): Use
HttpServer from bzrlib.tests.HttpServer instead of
bzrlib.transport.http.

* bzrlib/tests/test_repository.py: 
Does *not* use HttpServer.

* bzrlib/tests/test_http.py: 
Build on top of bzrlib.tests.HttpServer and bzrlib.tests.HTTPUtils
instead of bzrlib.transport.http.

* bzrlib/tests/test_bzrdir.py:
(ChrootedTests.setUp): Use HttpServer from bzrlib.tests.HttpServer
instead of bzrlib.transport.http.

* bzrlib/tests/branch_implementations/test_http.py:
(HTTPBranchTests.setUp): Use HttpServer from bzrlib.tests.HttpServer
instead of bzrlib.transport.http.

* bzrlib/tests/branch_implementations/test_branch.py:
(ChrootedTests.setUp): Use HttpServer from bzrlib.tests.HttpServer
instead of bzrlib.transport.http.

* bzrlib/tests/__init__.py:
(ChrootedTestCase.setUp): Use HttpServer from
bzrlib.tests.HttpServer instead of bzrlib.transport.http.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
 
 
20
import difflib
 
21
 
 
22
 
 
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
 
24
from bzrlib.knit import (
 
25
    KnitVersionedFile,
 
26
    KnitPlainFactory,
 
27
    KnitAnnotateFactory,
 
28
    WeaveToKnit)
 
29
from bzrlib.osutils import split_lines
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.transport import TransportLogger, get_transport
 
32
from bzrlib.transport.memory import MemoryTransport
 
33
from bzrlib.weave import Weave
 
34
 
 
35
 
 
36
class KnitTests(TestCaseWithTransport):
 
37
    """Class containing knit test helper routines."""
 
38
 
 
39
    def make_test_knit(self, annotate=False, delay_create=False):
 
40
        if not annotate:
 
41
            factory = KnitPlainFactory()
 
42
        else:
 
43
            factory = None
 
44
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
45
                                 factory=factory, create=True,
 
46
                                 delay_create=delay_create)
 
47
 
 
48
 
 
49
class BasicKnitTests(KnitTests):
 
50
 
 
51
    def add_stock_one_and_one_a(self, k):
 
52
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
53
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
54
 
 
55
    def test_knit_constructor(self):
 
56
        """Construct empty k"""
 
57
        self.make_test_knit()
 
58
 
 
59
    def test_knit_add(self):
 
60
        """Store one text in knit and retrieve"""
 
61
        k = self.make_test_knit()
 
62
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
63
        self.assertTrue(k.has_version('text-1'))
 
64
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
65
 
 
66
    def test_knit_reload(self):
 
67
        # test that the content in a reloaded knit is correct
 
68
        k = self.make_test_knit()
 
69
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
70
        del k
 
71
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
72
        self.assertTrue(k2.has_version('text-1'))
 
73
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
74
 
 
75
    def test_knit_several(self):
 
76
        """Store several texts in a knit"""
 
77
        k = self.make_test_knit()
 
78
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
79
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
80
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
81
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
82
        
 
83
    def test_repeated_add(self):
 
84
        """Knit traps attempt to replace existing version"""
 
85
        k = self.make_test_knit()
 
86
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
87
        self.assertRaises(RevisionAlreadyPresent, 
 
88
                k.add_lines,
 
89
                'text-1', [], split_lines(TEXT_1))
 
90
 
 
91
    def test_empty(self):
 
92
        k = self.make_test_knit(True)
 
93
        k.add_lines('text-1', [], [])
 
94
        self.assertEquals(k.get_lines('text-1'), [])
 
95
 
 
96
    def test_incomplete(self):
 
97
        """Test if texts without a ending line-end can be inserted and
 
98
        extracted."""
 
99
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
100
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
101
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
102
        # reopening ensures maximum room for confusion
 
103
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
104
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
105
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
106
 
 
107
    def test_delta(self):
 
108
        """Expression of knit delta as lines"""
 
109
        k = self.make_test_knit()
 
110
        td = list(line_delta(TEXT_1.splitlines(True),
 
111
                             TEXT_1A.splitlines(True)))
 
112
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
113
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
114
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
115
 
 
116
    def test_add_with_parents(self):
 
117
        """Store in knit with parents"""
 
118
        k = self.make_test_knit()
 
119
        self.add_stock_one_and_one_a(k)
 
120
        self.assertEquals(k.get_parents('text-1'), [])
 
121
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
122
 
 
123
    def test_ancestry(self):
 
124
        """Store in knit with parents"""
 
125
        k = self.make_test_knit()
 
126
        self.add_stock_one_and_one_a(k)
 
127
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
128
 
 
129
    def test_add_delta(self):
 
130
        """Store in knit with parents"""
 
131
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
132
            delta=True, create=True)
 
133
        self.add_stock_one_and_one_a(k)
 
134
        k.clear_cache()
 
135
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
136
 
 
137
    def test_annotate(self):
 
138
        """Annotations"""
 
139
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
140
            delta=True, create=True)
 
141
        self.insert_and_test_small_annotate(k)
 
142
 
 
143
    def insert_and_test_small_annotate(self, k):
 
144
        """test annotation with k works correctly."""
 
145
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
146
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
147
 
 
148
        origins = k.annotate('text-2')
 
149
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
150
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
151
 
 
152
    def test_annotate_fulltext(self):
 
153
        """Annotations"""
 
154
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
155
            delta=False, create=True)
 
156
        self.insert_and_test_small_annotate(k)
 
157
 
 
158
    def test_annotate_merge_1(self):
 
159
        k = self.make_test_knit(True)
 
160
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
161
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
162
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
163
        origins = k.annotate('text-am')
 
164
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
165
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
166
 
 
167
    def test_annotate_merge_2(self):
 
168
        k = self.make_test_knit(True)
 
169
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
170
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
171
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
172
        origins = k.annotate('text-am')
 
173
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
174
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
175
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
176
 
 
177
    def test_annotate_merge_9(self):
 
178
        k = self.make_test_knit(True)
 
179
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
180
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
181
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
182
        origins = k.annotate('text-am')
 
183
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
184
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
185
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
186
 
 
187
    def test_annotate_merge_3(self):
 
188
        k = self.make_test_knit(True)
 
189
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
190
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
191
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
192
        origins = k.annotate('text-am')
 
193
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
194
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
195
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
196
 
 
197
    def test_annotate_merge_4(self):
 
198
        k = self.make_test_knit(True)
 
199
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
200
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
201
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
202
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
203
        origins = k.annotate('text-am')
 
204
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
205
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
206
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
207
 
 
208
    def test_annotate_merge_5(self):
 
209
        k = self.make_test_knit(True)
 
210
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
211
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
212
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
213
        k.add_lines('text-am',
 
214
                    ['text-a1', 'text-a2', 'text-a3'],
 
215
                    ['a\n', 'e\n', 'z\n'])
 
216
        origins = k.annotate('text-am')
 
217
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
218
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
219
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
220
 
 
221
    def test_annotate_file_cherry_pick(self):
 
222
        k = self.make_test_knit(True)
 
223
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
224
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
225
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
226
        origins = k.annotate('text-3')
 
227
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
228
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
229
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
230
 
 
231
    def test_knit_join(self):
 
232
        """Store in knit with parents"""
 
233
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
234
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
235
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
236
 
 
237
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
238
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
239
 
 
240
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
241
 
 
242
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
243
        count = k2.join(k1, version_ids=['text-m'])
 
244
        self.assertEquals(count, 5)
 
245
        self.assertTrue(k2.has_version('text-a'))
 
246
        self.assertTrue(k2.has_version('text-c'))
 
247
 
 
248
    def test_reannotate(self):
 
249
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
250
                               factory=KnitAnnotateFactory(), create=True)
 
251
        # 0
 
252
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
253
        # 1
 
254
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
255
 
 
256
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
257
                               factory=KnitAnnotateFactory(), create=True)
 
258
        k2.join(k1, version_ids=['text-b'])
 
259
 
 
260
        # 2
 
261
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
262
        # 2
 
263
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
264
        # 3
 
265
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
266
 
 
267
        # test-c will have index 3
 
268
        k1.join(k2, version_ids=['text-c'])
 
269
 
 
270
        lines = k1.get_lines('text-c')
 
271
        self.assertEquals(lines, ['z\n', 'c\n'])
 
272
 
 
273
        origins = k1.annotate('text-c')
 
274
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
275
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
276
 
 
277
    def test_get_line_delta_texts(self):
 
278
        """Make sure we can call get_texts on text with reused line deltas"""
 
279
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
280
                               factory=KnitPlainFactory(), create=True)
 
281
        for t in range(3):
 
282
            if t == 0:
 
283
                parents = []
 
284
            else:
 
285
                parents = ['%d' % (t-1)]
 
286
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
287
        k1.get_texts(('%d' % t) for t in range(3))
 
288
        
 
289
    def test_iter_lines_reads_in_order(self):
 
290
        t = MemoryTransport()
 
291
        instrumented_t = TransportLogger(t)
 
292
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
293
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
294
        # add texts with no required ordering
 
295
        k1.add_lines('base', [], ['text\n'])
 
296
        k1.add_lines('base2', [], ['text2\n'])
 
297
        k1.clear_cache()
 
298
        instrumented_t._calls = []
 
299
        # request a last-first iteration
 
300
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
301
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
302
        self.assertEqual(['text\n', 'text2\n'], results)
 
303
 
 
304
    def test_create_empty_annotated(self):
 
305
        k1 = self.make_test_knit(True)
 
306
        # 0
 
307
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
308
        k2 = k1.create_empty('t', MemoryTransport())
 
309
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
310
        self.assertEqual(k1.delta, k2.delta)
 
311
        # the generic test checks for empty content and file class
 
312
 
 
313
    def test_knit_format(self):
 
314
        # this tests that a new knit index file has the expected content
 
315
        # and that is writes the data we expect as records are added.
 
316
        knit = self.make_test_knit(True)
 
317
        # Now knit files are not created until we first add data to them
 
318
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
319
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
320
        self.assertFileEqual(
 
321
            "# bzr knit index 8\n"
 
322
            "\n"
 
323
            "revid fulltext 0 84 .a_ghost :",
 
324
            'test.kndx')
 
325
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
326
        self.assertFileEqual(
 
327
            "# bzr knit index 8\n"
 
328
            "\nrevid fulltext 0 84 .a_ghost :"
 
329
            "\nrevid2 line-delta 84 82 0 :",
 
330
            'test.kndx')
 
331
        # we should be able to load this file again
 
332
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
333
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
334
        # write a short write to the file and ensure that its ignored
 
335
        indexfile = file('test.kndx', 'at')
 
336
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
337
        indexfile.close()
 
338
        # we should be able to load this file again
 
339
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
340
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
341
        # and add a revision with the same id the failed write had
 
342
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
343
        # and when reading it revid3 should now appear.
 
344
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
345
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
346
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
347
 
 
348
    def test_delay_create(self):
 
349
        """Test that passing delay_create=True creates files late"""
 
350
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
351
        self.failIfExists('test.knit')
 
352
        self.failIfExists('test.kndx')
 
353
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
354
        self.failUnlessExists('test.knit')
 
355
        self.assertFileEqual(
 
356
            "# bzr knit index 8\n"
 
357
            "\n"
 
358
            "revid fulltext 0 84 .a_ghost :",
 
359
            'test.kndx')
 
360
 
 
361
    def test_create_parent_dir(self):
 
362
        """create_parent_dir can create knits in nonexistant dirs"""
 
363
        # Has no effect if we don't set 'delay_create'
 
364
        trans = get_transport('.')
 
365
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
366
                          trans, access_mode='w', factory=None,
 
367
                          create=True, create_parent_dir=True)
 
368
        # Nothing should have changed yet
 
369
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
370
                                 factory=None, create=True,
 
371
                                 create_parent_dir=True,
 
372
                                 delay_create=True)
 
373
        self.failIfExists('dir/test.knit')
 
374
        self.failIfExists('dir/test.kndx')
 
375
        self.failIfExists('dir')
 
376
        knit.add_lines('revid', [], ['a\n'])
 
377
        self.failUnlessExists('dir')
 
378
        self.failUnlessExists('dir/test.knit')
 
379
        self.assertFileEqual(
 
380
            "# bzr knit index 8\n"
 
381
            "\n"
 
382
            "revid fulltext 0 84  :",
 
383
            'dir/test.kndx')
 
384
 
 
385
    def test_create_mode_700(self):
 
386
        trans = get_transport('.')
 
387
        if not trans._can_roundtrip_unix_modebits():
 
388
            # Can't roundtrip, so no need to run this test
 
389
            return
 
390
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
391
                                 factory=None, create=True,
 
392
                                 create_parent_dir=True,
 
393
                                 delay_create=True,
 
394
                                 file_mode=0600,
 
395
                                 dir_mode=0700)
 
396
        knit.add_lines('revid', [], ['a\n'])
 
397
        self.assertTransportMode(trans, 'dir', 0700)
 
398
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
399
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
400
 
 
401
    def test_create_mode_770(self):
 
402
        trans = get_transport('.')
 
403
        if not trans._can_roundtrip_unix_modebits():
 
404
            # Can't roundtrip, so no need to run this test
 
405
            return
 
406
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
407
                                 factory=None, create=True,
 
408
                                 create_parent_dir=True,
 
409
                                 delay_create=True,
 
410
                                 file_mode=0660,
 
411
                                 dir_mode=0770)
 
412
        knit.add_lines('revid', [], ['a\n'])
 
413
        self.assertTransportMode(trans, 'dir', 0770)
 
414
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
415
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
416
 
 
417
    def test_create_mode_777(self):
 
418
        trans = get_transport('.')
 
419
        if not trans._can_roundtrip_unix_modebits():
 
420
            # Can't roundtrip, so no need to run this test
 
421
            return
 
422
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
423
                                 factory=None, create=True,
 
424
                                 create_parent_dir=True,
 
425
                                 delay_create=True,
 
426
                                 file_mode=0666,
 
427
                                 dir_mode=0777)
 
428
        knit.add_lines('revid', [], ['a\n'])
 
429
        self.assertTransportMode(trans, 'dir', 0777)
 
430
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
431
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
432
 
 
433
    def test_plan_merge(self):
 
434
        my_knit = self.make_test_knit(annotate=True)
 
435
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
436
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
437
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
438
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
439
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
440
            self.assertEqual(plan_line, expected_line)
 
441
 
 
442
 
 
443
TEXT_1 = """\
 
444
Banana cup cakes:
 
445
 
 
446
- bananas
 
447
- eggs
 
448
- broken tea cups
 
449
"""
 
450
 
 
451
TEXT_1A = """\
 
452
Banana cup cake recipe
 
453
(serves 6)
 
454
 
 
455
- bananas
 
456
- eggs
 
457
- broken tea cups
 
458
- self-raising flour
 
459
"""
 
460
 
 
461
TEXT_1B = """\
 
462
Banana cup cake recipe
 
463
 
 
464
- bananas (do not use plantains!!!)
 
465
- broken tea cups
 
466
- flour
 
467
"""
 
468
 
 
469
delta_1_1a = """\
 
470
0,1,2
 
471
Banana cup cake recipe
 
472
(serves 6)
 
473
5,5,1
 
474
- self-raising flour
 
475
"""
 
476
 
 
477
TEXT_2 = """\
 
478
Boeuf bourguignon
 
479
 
 
480
- beef
 
481
- red wine
 
482
- small onions
 
483
- carrot
 
484
- mushrooms
 
485
"""
 
486
 
 
487
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
488
new-a|(serves 6)
 
489
unchanged|
 
490
killed-b|- bananas
 
491
killed-b|- eggs
 
492
new-b|- bananas (do not use plantains!!!)
 
493
unchanged|- broken tea cups
 
494
new-a|- self-raising flour
 
495
new-b|- flour
 
496
"""
 
497
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
498
 
 
499
 
 
500
def line_delta(from_lines, to_lines):
 
501
    """Generate line-based delta from one text to another"""
 
502
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
503
    for op in s.get_opcodes():
 
504
        if op[0] == 'equal':
 
505
            continue
 
506
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
507
        for i in range(op[3], op[4]):
 
508
            yield to_lines[i]
 
509
 
 
510
 
 
511
def apply_line_delta(basis_lines, delta_lines):
 
512
    """Apply a line-based perfect diff
 
513
    
 
514
    basis_lines -- text to apply the patch to
 
515
    delta_lines -- diff instructions and content
 
516
    """
 
517
    out = basis_lines[:]
 
518
    i = 0
 
519
    offset = 0
 
520
    while i < len(delta_lines):
 
521
        l = delta_lines[i]
 
522
        a, b, c = map(long, l.split(','))
 
523
        i = i + 1
 
524
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
525
        i = i + c
 
526
        offset = offset + (b - a) + c
 
527
    return out
 
528
 
 
529
 
 
530
class TestWeaveToKnit(KnitTests):
 
531
 
 
532
    def test_weave_to_knit_matches(self):
 
533
        # check that the WeaveToKnit is_compatible function
 
534
        # registers True for a Weave to a Knit.
 
535
        w = Weave()
 
536
        k = self.make_test_knit()
 
537
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
538
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
539
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
540
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
541
 
 
542
 
 
543
class TestKnitCaching(KnitTests):
 
544
    
 
545
    def create_knit(self, cache_add=False):
 
546
        k = self.make_test_knit(True)
 
547
        if cache_add:
 
548
            k.enable_cache()
 
549
 
 
550
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
551
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
552
        return k
 
553
 
 
554
    def test_no_caching(self):
 
555
        k = self.create_knit()
 
556
        # Nothing should be cached without setting 'enable_cache'
 
557
        self.assertEqual({}, k._data._cache)
 
558
 
 
559
    def test_cache_add_and_clear(self):
 
560
        k = self.create_knit(True)
 
561
 
 
562
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
563
 
 
564
        k.clear_cache()
 
565
        self.assertEqual({}, k._data._cache)
 
566
 
 
567
    def test_cache_data_read_raw(self):
 
568
        k = self.create_knit()
 
569
 
 
570
        # Now cache and read
 
571
        k.enable_cache()
 
572
 
 
573
        def read_one_raw(version):
 
574
            pos_map = k._get_components_positions([version])
 
575
            method, pos, size, next = pos_map[version]
 
576
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
577
            self.assertEqual(1, len(lst))
 
578
            return lst[0]
 
579
 
 
580
        val = read_one_raw('text-1')
 
581
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
582
 
 
583
        k.clear_cache()
 
584
        # After clear, new reads are not cached
 
585
        self.assertEqual({}, k._data._cache)
 
586
 
 
587
        val2 = read_one_raw('text-1')
 
588
        self.assertEqual(val, val2)
 
589
        self.assertEqual({}, k._data._cache)
 
590
 
 
591
    def test_cache_data_read(self):
 
592
        k = self.create_knit()
 
593
 
 
594
        def read_one(version):
 
595
            pos_map = k._get_components_positions([version])
 
596
            method, pos, size, next = pos_map[version]
 
597
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
598
            self.assertEqual(1, len(lst))
 
599
            return lst[0]
 
600
 
 
601
        # Now cache and read
 
602
        k.enable_cache()
 
603
 
 
604
        val = read_one('text-2')
 
605
        self.assertEqual(['text-2'], k._data._cache.keys())
 
606
        self.assertEqual('text-2', val[0])
 
607
        content, digest = k._data._parse_record('text-2',
 
608
                                                k._data._cache['text-2'])
 
609
        self.assertEqual(content, val[1])
 
610
        self.assertEqual(digest, val[2])
 
611
 
 
612
        k.clear_cache()
 
613
        self.assertEqual({}, k._data._cache)
 
614
 
 
615
        val2 = read_one('text-2')
 
616
        self.assertEqual(val, val2)
 
617
        self.assertEqual({}, k._data._cache)
 
618
 
 
619
    def test_cache_read(self):
 
620
        k = self.create_knit()
 
621
        k.enable_cache()
 
622
 
 
623
        text = k.get_text('text-1')
 
624
        self.assertEqual(TEXT_1, text)
 
625
        self.assertEqual(['text-1'], k._data._cache.keys())
 
626
 
 
627
        k.clear_cache()
 
628
        self.assertEqual({}, k._data._cache)
 
629
 
 
630
        text = k.get_text('text-1')
 
631
        self.assertEqual(TEXT_1, text)
 
632
        self.assertEqual({}, k._data._cache)