/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-07-18 18:57:54 UTC
  • mto: This revision was merged to the branch mainline in revision 1868.
  • Revision ID: john@arbash-meinel.com-20060718185754-4007745748e28db9
Commit timestamp restricted to 1ms precision.

The old code would restrict to 1s resolution if the timestamp was
supplied, while it preserved full resolution if the timestamp was
auto generated. Now both paths preserve only 1ms resolution.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
 
 
20
import difflib
 
21
 
 
22
 
 
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
 
24
from bzrlib.knit import (
 
25
    KnitVersionedFile,
 
26
    KnitPlainFactory,
 
27
    KnitAnnotateFactory,
 
28
    WeaveToKnit)
 
29
from bzrlib.osutils import split_lines
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.transport import TransportLogger, get_transport
 
32
from bzrlib.transport.memory import MemoryTransport
 
33
from bzrlib.weave import Weave
 
34
 
 
35
 
 
36
class KnitTests(TestCaseWithTransport):
 
37
    """Class containing knit test helper routines."""
 
38
 
 
39
    def make_test_knit(self, annotate=False):
 
40
        if not annotate:
 
41
            factory = KnitPlainFactory()
 
42
        else:
 
43
            factory = None
 
44
        return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
 
45
 
 
46
 
 
47
class BasicKnitTests(KnitTests):
 
48
 
 
49
    def add_stock_one_and_one_a(self, k):
 
50
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
51
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
52
 
 
53
    def test_knit_constructor(self):
 
54
        """Construct empty k"""
 
55
        self.make_test_knit()
 
56
 
 
57
    def test_knit_add(self):
 
58
        """Store one text in knit and retrieve"""
 
59
        k = self.make_test_knit()
 
60
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
61
        self.assertTrue(k.has_version('text-1'))
 
62
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
63
 
 
64
    def test_knit_reload(self):
 
65
        # test that the content in a reloaded knit is correct
 
66
        k = self.make_test_knit()
 
67
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
68
        del k
 
69
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
70
        self.assertTrue(k2.has_version('text-1'))
 
71
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
72
 
 
73
    def test_knit_several(self):
 
74
        """Store several texts in a knit"""
 
75
        k = self.make_test_knit()
 
76
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
77
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
78
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
79
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
80
        
 
81
    def test_repeated_add(self):
 
82
        """Knit traps attempt to replace existing version"""
 
83
        k = self.make_test_knit()
 
84
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
85
        self.assertRaises(RevisionAlreadyPresent, 
 
86
                k.add_lines,
 
87
                'text-1', [], split_lines(TEXT_1))
 
88
 
 
89
    def test_empty(self):
 
90
        k = self.make_test_knit(True)
 
91
        k.add_lines('text-1', [], [])
 
92
        self.assertEquals(k.get_lines('text-1'), [])
 
93
 
 
94
    def test_incomplete(self):
 
95
        """Test if texts without a ending line-end can be inserted and
 
96
        extracted."""
 
97
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
98
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
99
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
100
        # reopening ensures maximum room for confusion
 
101
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
102
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
103
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
104
 
 
105
    def test_delta(self):
 
106
        """Expression of knit delta as lines"""
 
107
        k = self.make_test_knit()
 
108
        td = list(line_delta(TEXT_1.splitlines(True),
 
109
                             TEXT_1A.splitlines(True)))
 
110
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
111
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
112
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
113
 
 
114
    def test_add_with_parents(self):
 
115
        """Store in knit with parents"""
 
116
        k = self.make_test_knit()
 
117
        self.add_stock_one_and_one_a(k)
 
118
        self.assertEquals(k.get_parents('text-1'), [])
 
119
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
120
 
 
121
    def test_ancestry(self):
 
122
        """Store in knit with parents"""
 
123
        k = self.make_test_knit()
 
124
        self.add_stock_one_and_one_a(k)
 
125
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
126
 
 
127
    def test_add_delta(self):
 
128
        """Store in knit with parents"""
 
129
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
130
            delta=True, create=True)
 
131
        self.add_stock_one_and_one_a(k)
 
132
        k.clear_cache()
 
133
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
134
 
 
135
    def test_annotate(self):
 
136
        """Annotations"""
 
137
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
138
            delta=True, create=True)
 
139
        self.insert_and_test_small_annotate(k)
 
140
 
 
141
    def insert_and_test_small_annotate(self, k):
 
142
        """test annotation with k works correctly."""
 
143
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
144
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
145
 
 
146
        origins = k.annotate('text-2')
 
147
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
148
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
149
 
 
150
    def test_annotate_fulltext(self):
 
151
        """Annotations"""
 
152
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
153
            delta=False, create=True)
 
154
        self.insert_and_test_small_annotate(k)
 
155
 
 
156
    def test_annotate_merge_1(self):
 
157
        k = self.make_test_knit(True)
 
158
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
159
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
160
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
161
        origins = k.annotate('text-am')
 
162
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
163
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
164
 
 
165
    def test_annotate_merge_2(self):
 
166
        k = self.make_test_knit(True)
 
167
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
168
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
169
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
170
        origins = k.annotate('text-am')
 
171
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
172
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
173
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
174
 
 
175
    def test_annotate_merge_9(self):
 
176
        k = self.make_test_knit(True)
 
177
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
178
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
179
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
180
        origins = k.annotate('text-am')
 
181
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
182
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
183
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
184
 
 
185
    def test_annotate_merge_3(self):
 
186
        k = self.make_test_knit(True)
 
187
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
188
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
189
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
190
        origins = k.annotate('text-am')
 
191
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
192
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
193
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
194
 
 
195
    def test_annotate_merge_4(self):
 
196
        k = self.make_test_knit(True)
 
197
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
198
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
199
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
200
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
201
        origins = k.annotate('text-am')
 
202
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
203
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
204
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
205
 
 
206
    def test_annotate_merge_5(self):
 
207
        k = self.make_test_knit(True)
 
208
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
209
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
210
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
211
        k.add_lines('text-am',
 
212
                    ['text-a1', 'text-a2', 'text-a3'],
 
213
                    ['a\n', 'e\n', 'z\n'])
 
214
        origins = k.annotate('text-am')
 
215
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
216
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
217
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
218
 
 
219
    def test_annotate_file_cherry_pick(self):
 
220
        k = self.make_test_knit(True)
 
221
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
222
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
223
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
224
        origins = k.annotate('text-3')
 
225
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
226
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
227
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
228
 
 
229
    def test_knit_join(self):
 
230
        """Store in knit with parents"""
 
231
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
232
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
233
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
234
 
 
235
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
236
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
237
 
 
238
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
239
 
 
240
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
241
        count = k2.join(k1, version_ids=['text-m'])
 
242
        self.assertEquals(count, 5)
 
243
        self.assertTrue(k2.has_version('text-a'))
 
244
        self.assertTrue(k2.has_version('text-c'))
 
245
 
 
246
    def test_reannotate(self):
 
247
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
248
                               factory=KnitAnnotateFactory(), create=True)
 
249
        # 0
 
250
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
251
        # 1
 
252
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
253
 
 
254
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
255
                               factory=KnitAnnotateFactory(), create=True)
 
256
        k2.join(k1, version_ids=['text-b'])
 
257
 
 
258
        # 2
 
259
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
260
        # 2
 
261
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
262
        # 3
 
263
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
264
 
 
265
        # test-c will have index 3
 
266
        k1.join(k2, version_ids=['text-c'])
 
267
 
 
268
        lines = k1.get_lines('text-c')
 
269
        self.assertEquals(lines, ['z\n', 'c\n'])
 
270
 
 
271
        origins = k1.annotate('text-c')
 
272
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
273
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
274
 
 
275
    def test_get_line_delta_texts(self):
 
276
        """Make sure we can call get_texts on text with reused line deltas"""
 
277
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
278
                               factory=KnitPlainFactory(), create=True)
 
279
        for t in range(3):
 
280
            if t == 0:
 
281
                parents = []
 
282
            else:
 
283
                parents = ['%d' % (t-1)]
 
284
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
285
        k1.get_texts(('%d' % t) for t in range(3))
 
286
        
 
287
    def test_iter_lines_reads_in_order(self):
 
288
        t = MemoryTransport()
 
289
        instrumented_t = TransportLogger(t)
 
290
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
291
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
292
        # add texts with no required ordering
 
293
        k1.add_lines('base', [], ['text\n'])
 
294
        k1.add_lines('base2', [], ['text2\n'])
 
295
        k1.clear_cache()
 
296
        instrumented_t._calls = []
 
297
        # request a last-first iteration
 
298
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
299
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
300
        self.assertEqual(['text\n', 'text2\n'], results)
 
301
 
 
302
    def test_create_empty_annotated(self):
 
303
        k1 = self.make_test_knit(True)
 
304
        # 0
 
305
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
306
        k2 = k1.create_empty('t', MemoryTransport())
 
307
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
308
        self.assertEqual(k1.delta, k2.delta)
 
309
        # the generic test checks for empty content and file class
 
310
 
 
311
    def test_knit_format(self):
 
312
        # this tests that a new knit index file has the expected content
 
313
        # and that is writes the data we expect as records are added.
 
314
        knit = self.make_test_knit(True)
 
315
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
316
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
317
        self.assertFileEqual(
 
318
            "# bzr knit index 8\n"
 
319
            "\n"
 
320
            "revid fulltext 0 84 .a_ghost :",
 
321
            'test.kndx')
 
322
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
323
        self.assertFileEqual(
 
324
            "# bzr knit index 8\n"
 
325
            "\nrevid fulltext 0 84 .a_ghost :"
 
326
            "\nrevid2 line-delta 84 82 0 :",
 
327
            'test.kndx')
 
328
        # we should be able to load this file again
 
329
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
330
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
331
        # write a short write to the file and ensure that its ignored
 
332
        indexfile = file('test.kndx', 'at')
 
333
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
334
        indexfile.close()
 
335
        # we should be able to load this file again
 
336
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
337
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
338
        # and add a revision with the same id the failed write had
 
339
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
340
        # and when reading it revid3 should now appear.
 
341
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
342
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
343
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
344
 
 
345
    def test_plan_merge(self):
 
346
        my_knit = self.make_test_knit(annotate=True)
 
347
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
348
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
349
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
350
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
351
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
352
            self.assertEqual(plan_line, expected_line)
 
353
 
 
354
 
 
355
TEXT_1 = """\
 
356
Banana cup cakes:
 
357
 
 
358
- bananas
 
359
- eggs
 
360
- broken tea cups
 
361
"""
 
362
 
 
363
TEXT_1A = """\
 
364
Banana cup cake recipe
 
365
(serves 6)
 
366
 
 
367
- bananas
 
368
- eggs
 
369
- broken tea cups
 
370
- self-raising flour
 
371
"""
 
372
 
 
373
TEXT_1B = """\
 
374
Banana cup cake recipe
 
375
 
 
376
- bananas (do not use plantains!!!)
 
377
- broken tea cups
 
378
- flour
 
379
"""
 
380
 
 
381
delta_1_1a = """\
 
382
0,1,2
 
383
Banana cup cake recipe
 
384
(serves 6)
 
385
5,5,1
 
386
- self-raising flour
 
387
"""
 
388
 
 
389
TEXT_2 = """\
 
390
Boeuf bourguignon
 
391
 
 
392
- beef
 
393
- red wine
 
394
- small onions
 
395
- carrot
 
396
- mushrooms
 
397
"""
 
398
 
 
399
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
400
new-a|(serves 6)
 
401
unchanged|
 
402
killed-b|- bananas
 
403
killed-b|- eggs
 
404
new-b|- bananas (do not use plantains!!!)
 
405
unchanged|- broken tea cups
 
406
new-a|- self-raising flour
 
407
new-b|- flour
 
408
"""
 
409
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
410
 
 
411
 
 
412
def line_delta(from_lines, to_lines):
 
413
    """Generate line-based delta from one text to another"""
 
414
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
415
    for op in s.get_opcodes():
 
416
        if op[0] == 'equal':
 
417
            continue
 
418
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
419
        for i in range(op[3], op[4]):
 
420
            yield to_lines[i]
 
421
 
 
422
 
 
423
def apply_line_delta(basis_lines, delta_lines):
 
424
    """Apply a line-based perfect diff
 
425
    
 
426
    basis_lines -- text to apply the patch to
 
427
    delta_lines -- diff instructions and content
 
428
    """
 
429
    out = basis_lines[:]
 
430
    i = 0
 
431
    offset = 0
 
432
    while i < len(delta_lines):
 
433
        l = delta_lines[i]
 
434
        a, b, c = map(long, l.split(','))
 
435
        i = i + 1
 
436
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
437
        i = i + c
 
438
        offset = offset + (b - a) + c
 
439
    return out
 
440
 
 
441
 
 
442
class TestWeaveToKnit(KnitTests):
 
443
 
 
444
    def test_weave_to_knit_matches(self):
 
445
        # check that the WeaveToKnit is_compatible function
 
446
        # registers True for a Weave to a Knit.
 
447
        w = Weave()
 
448
        k = self.make_test_knit()
 
449
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
450
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
451
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
452
        self.failIf(WeaveToKnit.is_compatible(k, k))