/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-05-10 19:43:34 UTC
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060510194334-0c20aad23237d047
Working on getting normalize_url working.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
 
 
20
import difflib
 
21
 
 
22
 
 
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
 
24
from bzrlib.knit import KnitVersionedFile, KnitPlainFactory, KnitAnnotateFactory
 
25
from bzrlib.osutils import split_lines
 
26
from bzrlib.tests import TestCaseInTempDir
 
27
from bzrlib.transport import TransportLogger, get_transport
 
28
from bzrlib.transport.memory import MemoryTransport
 
29
 
 
30
 
 
31
class KnitTests(TestCaseInTempDir):
 
32
 
 
33
    def add_stock_one_and_one_a(self, k):
 
34
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
35
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
36
 
 
37
    def test_knit_constructor(self):
 
38
        """Construct empty k"""
 
39
        self.make_test_knit()
 
40
 
 
41
    def make_test_knit(self, annotate=False):
 
42
        if not annotate:
 
43
            factory = KnitPlainFactory()
 
44
        else:
 
45
            factory = None
 
46
        return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
 
47
 
 
48
    def test_knit_add(self):
 
49
        """Store one text in knit and retrieve"""
 
50
        k = self.make_test_knit()
 
51
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
52
        self.assertTrue(k.has_version('text-1'))
 
53
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
54
 
 
55
    def test_knit_reload(self):
 
56
        # test that the content in a reloaded knit is correct
 
57
        k = self.make_test_knit()
 
58
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
59
        del k
 
60
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
61
        self.assertTrue(k2.has_version('text-1'))
 
62
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
63
 
 
64
    def test_knit_several(self):
 
65
        """Store several texts in a knit"""
 
66
        k = self.make_test_knit()
 
67
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
68
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
69
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
70
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
71
        
 
72
    def test_repeated_add(self):
 
73
        """Knit traps attempt to replace existing version"""
 
74
        k = self.make_test_knit()
 
75
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
76
        self.assertRaises(RevisionAlreadyPresent, 
 
77
                k.add_lines,
 
78
                'text-1', [], split_lines(TEXT_1))
 
79
 
 
80
    def test_empty(self):
 
81
        k = self.make_test_knit(True)
 
82
        k.add_lines('text-1', [], [])
 
83
        self.assertEquals(k.get_lines('text-1'), [])
 
84
 
 
85
    def test_incomplete(self):
 
86
        """Test if texts without a ending line-end can be inserted and
 
87
        extracted."""
 
88
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
89
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
90
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
91
        # reopening ensures maximum room for confusion
 
92
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
93
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
94
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
95
 
 
96
    def test_delta(self):
 
97
        """Expression of knit delta as lines"""
 
98
        k = self.make_test_knit()
 
99
        td = list(line_delta(TEXT_1.splitlines(True),
 
100
                             TEXT_1A.splitlines(True)))
 
101
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
102
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
103
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
104
 
 
105
    def test_add_with_parents(self):
 
106
        """Store in knit with parents"""
 
107
        k = self.make_test_knit()
 
108
        self.add_stock_one_and_one_a(k)
 
109
        self.assertEquals(k.get_parents('text-1'), [])
 
110
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
111
 
 
112
    def test_ancestry(self):
 
113
        """Store in knit with parents"""
 
114
        k = self.make_test_knit()
 
115
        self.add_stock_one_and_one_a(k)
 
116
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
117
 
 
118
    def test_add_delta(self):
 
119
        """Store in knit with parents"""
 
120
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
121
            delta=True, create=True)
 
122
        self.add_stock_one_and_one_a(k)
 
123
        k.clear_cache()
 
124
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
125
 
 
126
    def test_annotate(self):
 
127
        """Annotations"""
 
128
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
129
            delta=True, create=True)
 
130
        self.insert_and_test_small_annotate(k)
 
131
 
 
132
    def insert_and_test_small_annotate(self, k):
 
133
        """test annotation with k works correctly."""
 
134
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
135
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
136
 
 
137
        origins = k.annotate('text-2')
 
138
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
139
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
140
 
 
141
    def test_annotate_fulltext(self):
 
142
        """Annotations"""
 
143
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
144
            delta=False, create=True)
 
145
        self.insert_and_test_small_annotate(k)
 
146
 
 
147
    def test_annotate_merge_1(self):
 
148
        k = self.make_test_knit(True)
 
149
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
150
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
151
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
152
        origins = k.annotate('text-am')
 
153
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
154
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
155
 
 
156
    def test_annotate_merge_2(self):
 
157
        k = self.make_test_knit(True)
 
158
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
159
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
160
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
161
        origins = k.annotate('text-am')
 
162
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
163
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
164
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
165
 
 
166
    def test_annotate_merge_9(self):
 
167
        k = self.make_test_knit(True)
 
168
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
169
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
170
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
171
        origins = k.annotate('text-am')
 
172
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
173
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
174
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
175
 
 
176
    def test_annotate_merge_3(self):
 
177
        k = self.make_test_knit(True)
 
178
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
179
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
180
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
181
        origins = k.annotate('text-am')
 
182
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
183
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
184
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
185
 
 
186
    def test_annotate_merge_4(self):
 
187
        k = self.make_test_knit(True)
 
188
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
189
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
190
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
191
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
192
        origins = k.annotate('text-am')
 
193
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
194
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
195
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
196
 
 
197
    def test_annotate_merge_5(self):
 
198
        k = self.make_test_knit(True)
 
199
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
200
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
201
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
202
        k.add_lines('text-am',
 
203
                    ['text-a1', 'text-a2', 'text-a3'],
 
204
                    ['a\n', 'e\n', 'z\n'])
 
205
        origins = k.annotate('text-am')
 
206
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
207
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
208
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
209
 
 
210
    def test_annotate_file_cherry_pick(self):
 
211
        k = self.make_test_knit(True)
 
212
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
213
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
214
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
215
        origins = k.annotate('text-3')
 
216
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
217
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
218
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
219
 
 
220
    def test_knit_join(self):
 
221
        """Store in knit with parents"""
 
222
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
223
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
224
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
225
 
 
226
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
227
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
228
 
 
229
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
230
 
 
231
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
232
        count = k2.join(k1, version_ids=['text-m'])
 
233
        self.assertEquals(count, 5)
 
234
        self.assertTrue(k2.has_version('text-a'))
 
235
        self.assertTrue(k2.has_version('text-c'))
 
236
 
 
237
    def test_reannotate(self):
 
238
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
239
                               factory=KnitAnnotateFactory(), create=True)
 
240
        # 0
 
241
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
242
        # 1
 
243
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
244
 
 
245
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
246
                               factory=KnitAnnotateFactory(), create=True)
 
247
        k2.join(k1, version_ids=['text-b'])
 
248
 
 
249
        # 2
 
250
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
251
        # 2
 
252
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
253
        # 3
 
254
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
255
 
 
256
        # test-c will have index 3
 
257
        k1.join(k2, version_ids=['text-c'])
 
258
 
 
259
        lines = k1.get_lines('text-c')
 
260
        self.assertEquals(lines, ['z\n', 'c\n'])
 
261
 
 
262
        origins = k1.annotate('text-c')
 
263
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
264
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
265
 
 
266
    def test_extraction_reads_components_once(self):
 
267
        t = MemoryTransport()
 
268
        instrumented_t = TransportLogger(t)
 
269
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
270
        # should read the index
 
271
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
272
        instrumented_t._calls = []
 
273
        # add a text       
 
274
        k1.add_lines('base', [], ['text\n'])
 
275
        # should not have read at all
 
276
        self.assertEqual([], instrumented_t._calls)
 
277
 
 
278
        # add a text
 
279
        k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
 
280
        # should not have read at all
 
281
        self.assertEqual([], instrumented_t._calls)
 
282
        
 
283
        # read a text
 
284
        k1.get_lines('sub')
 
285
        # should not have read at all
 
286
        self.assertEqual([], instrumented_t._calls)
 
287
 
 
288
        # clear the cache
 
289
        k1.clear_cache()
 
290
 
 
291
        # read a text
 
292
        k1.get_lines('base')
 
293
        # should have read a component
 
294
        # should not have read the first component only
 
295
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
 
296
        instrumented_t._calls = []
 
297
        # read again
 
298
        k1.get_lines('base')
 
299
        # should not have read at all
 
300
        self.assertEqual([], instrumented_t._calls)
 
301
        # and now read the other component
 
302
        k1.get_lines('sub')
 
303
        # should have read the second component
 
304
        self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
 
305
        instrumented_t._calls = []
 
306
 
 
307
        # clear the cache
 
308
        k1.clear_cache()
 
309
        # add a text cold 
 
310
        k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
 
311
        # should read the first component only
 
312
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
 
313
        
 
314
    def test_iter_lines_reads_in_order(self):
 
315
        t = MemoryTransport()
 
316
        instrumented_t = TransportLogger(t)
 
317
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
318
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
319
        # add texts with no required ordering
 
320
        k1.add_lines('base', [], ['text\n'])
 
321
        k1.add_lines('base2', [], ['text2\n'])
 
322
        k1.clear_cache()
 
323
        instrumented_t._calls = []
 
324
        # request a last-first iteration
 
325
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
326
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
327
        self.assertEqual(['text\n', 'text2\n'], results)
 
328
 
 
329
    def test_create_empty_annotated(self):
 
330
        k1 = self.make_test_knit(True)
 
331
        # 0
 
332
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
333
        k2 = k1.create_empty('t', MemoryTransport())
 
334
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
335
        self.assertEqual(k1.delta, k2.delta)
 
336
        # the generic test checks for empty content and file class
 
337
 
 
338
    def test_knit_format(self):
 
339
        # this tests that a new knit index file has the expected content
 
340
        # and that is writes the data we expect as records are added.
 
341
        knit = self.make_test_knit(True)
 
342
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
343
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
344
        self.assertFileEqual(
 
345
            "# bzr knit index 8\n"
 
346
            "\n"
 
347
            "revid fulltext 0 84 .a_ghost :",
 
348
            'test.kndx')
 
349
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
350
        self.assertFileEqual(
 
351
            "# bzr knit index 8\n"
 
352
            "\nrevid fulltext 0 84 .a_ghost :"
 
353
            "\nrevid2 line-delta 84 82 0 :",
 
354
            'test.kndx')
 
355
        # we should be able to load this file again
 
356
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
357
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
358
        # write a short write to the file and ensure that its ignored
 
359
        indexfile = file('test.kndx', 'at')
 
360
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
361
        indexfile.close()
 
362
        # we should be able to load this file again
 
363
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
364
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
365
        # and add a revision with the same id the failed write had
 
366
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
367
        # and when reading it revid3 should now appear.
 
368
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
369
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
370
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
371
 
 
372
    def test_plan_merge(self):
 
373
        my_knit = self.make_test_knit(annotate=True)
 
374
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
375
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
376
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
377
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
378
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
379
            self.assertEqual(plan_line, expected_line)
 
380
 
 
381
 
 
382
TEXT_1 = """\
 
383
Banana cup cakes:
 
384
 
 
385
- bananas
 
386
- eggs
 
387
- broken tea cups
 
388
"""
 
389
 
 
390
TEXT_1A = """\
 
391
Banana cup cake recipe
 
392
(serves 6)
 
393
 
 
394
- bananas
 
395
- eggs
 
396
- broken tea cups
 
397
- self-raising flour
 
398
"""
 
399
 
 
400
TEXT_1B = """\
 
401
Banana cup cake recipe
 
402
 
 
403
- bananas (do not use plantains!!!)
 
404
- broken tea cups
 
405
- flour
 
406
"""
 
407
 
 
408
delta_1_1a = """\
 
409
0,1,2
 
410
Banana cup cake recipe
 
411
(serves 6)
 
412
5,5,1
 
413
- self-raising flour
 
414
"""
 
415
 
 
416
TEXT_2 = """\
 
417
Boeuf bourguignon
 
418
 
 
419
- beef
 
420
- red wine
 
421
- small onions
 
422
- carrot
 
423
- mushrooms
 
424
"""
 
425
 
 
426
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
427
new-a|(serves 6)
 
428
unchanged|
 
429
killed-b|- bananas
 
430
killed-b|- eggs
 
431
new-b|- bananas (do not use plantains!!!)
 
432
unchanged|- broken tea cups
 
433
new-a|- self-raising flour
 
434
new-b|- flour
 
435
"""
 
436
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
437
 
 
438
 
 
439
def line_delta(from_lines, to_lines):
 
440
    """Generate line-based delta from one text to another"""
 
441
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
442
    for op in s.get_opcodes():
 
443
        if op[0] == 'equal':
 
444
            continue
 
445
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
446
        for i in range(op[3], op[4]):
 
447
            yield to_lines[i]
 
448
 
 
449
 
 
450
def apply_line_delta(basis_lines, delta_lines):
 
451
    """Apply a line-based perfect diff
 
452
    
 
453
    basis_lines -- text to apply the patch to
 
454
    delta_lines -- diff instructions and content
 
455
    """
 
456
    out = basis_lines[:]
 
457
    i = 0
 
458
    offset = 0
 
459
    while i < len(delta_lines):
 
460
        l = delta_lines[i]
 
461
        a, b, c = map(long, l.split(','))
 
462
        i = i + 1
 
463
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
464
        i = i + c
 
465
        offset = offset + (b - a) + c
 
466
    return out