/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Robert Collins
  • Date: 2006-03-06 09:38:37 UTC
  • mto: (1594.2.4 integration)
  • mto: This revision was merged to the branch mainline in revision 1596.
  • Revision ID: robertc@robertcollins.net-20060306093837-b151989e9572895e
Remove all but fetch references to repository.revision_store.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/python2.4
 
2
 
 
3
# Copyright (C) 2005 by Canonical Ltd
 
4
 
 
5
# This program is free software; you can redistribute it and/or modify
 
6
# it under the terms of the GNU General Public License as published by
 
7
# the Free Software Foundation; either version 2 of the License, or
 
8
# (at your option) any later version.
 
9
 
 
10
# This program is distributed in the hope that it will be useful,
 
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
# GNU General Public License for more details.
 
14
 
 
15
# You should have received a copy of the GNU General Public License
 
16
# along with this program; if not, write to the Free Software
 
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
18
 
 
19
 
 
20
# TODO: tests regarding version names
 
21
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
 
22
#       if it fails.
 
23
 
 
24
"""test suite for weave algorithm"""
 
25
 
 
26
from pprint import pformat
 
27
 
 
28
import bzrlib.errors as errors
 
29
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
 
30
from bzrlib.weavefile import write_weave, read_weave
 
31
from bzrlib.tests import TestCase
 
32
from bzrlib.osutils import sha_string
 
33
 
 
34
 
 
35
# texts for use in testing
 
36
TEXT_0 = ["Hello world"]
 
37
TEXT_1 = ["Hello world",
 
38
          "A second line"]
 
39
 
 
40
 
 
41
class TestBase(TestCase):
 
42
    def check_read_write(self, k):
 
43
        """Check the weave k can be written & re-read."""
 
44
        from tempfile import TemporaryFile
 
45
        tf = TemporaryFile()
 
46
 
 
47
        write_weave(k, tf)
 
48
        tf.seek(0)
 
49
        k2 = read_weave(tf)
 
50
 
 
51
        if k != k2:
 
52
            tf.seek(0)
 
53
            self.log('serialized weave:')
 
54
            self.log(tf.read())
 
55
 
 
56
            self.log('')
 
57
            self.log('parents: %s' % (k._parents == k2._parents))
 
58
            self.log('         %r' % k._parents)
 
59
            self.log('         %r' % k2._parents)
 
60
            self.log('')
 
61
            self.fail('read/write check failed')
 
62
 
 
63
 
 
64
class WeaveContains(TestBase):
 
65
    """Weave __contains__ operator"""
 
66
    def runTest(self):
 
67
        k = Weave()
 
68
        self.assertFalse('foo' in k)
 
69
        k.add('foo', [], TEXT_1)
 
70
        self.assertTrue('foo' in k)
 
71
 
 
72
 
 
73
class Easy(TestBase):
 
74
    def runTest(self):
 
75
        k = Weave()
 
76
 
 
77
 
 
78
class StoreText(TestBase):
 
79
    """Store and retrieve a simple text."""
 
80
    def runTest(self):
 
81
        k = Weave()
 
82
        idx = k.add('text0', [], TEXT_0)
 
83
        self.assertEqual(k.get(idx), TEXT_0)
 
84
        self.assertEqual(idx, 0)
 
85
 
 
86
 
 
87
class AnnotateOne(TestBase):
 
88
    def runTest(self):
 
89
        k = Weave()
 
90
        k.add('text0', [], TEXT_0)
 
91
        self.assertEqual(k.annotate(0),
 
92
                         [(0, TEXT_0[0])])
 
93
 
 
94
 
 
95
class StoreTwo(TestBase):
 
96
    def runTest(self):
 
97
        k = Weave()
 
98
 
 
99
        idx = k.add('text0', [], TEXT_0)
 
100
        self.assertEqual(idx, 0)
 
101
 
 
102
        idx = k.add('text1', [], TEXT_1)
 
103
        self.assertEqual(idx, 1)
 
104
 
 
105
        self.assertEqual(k.get(0), TEXT_0)
 
106
        self.assertEqual(k.get(1), TEXT_1)
 
107
 
 
108
 
 
109
class AddWithGivenSha(TestBase):
 
110
    def runTest(self):
 
111
        """Add with caller-supplied SHA-1"""
 
112
        k = Weave()
 
113
 
 
114
        t = 'text0'
 
115
        k.add('text0', [], [t], sha1=sha_string(t))
 
116
 
 
117
 
 
118
class GetSha1(TestBase):
 
119
    def test_get_sha1(self):
 
120
        k = Weave()
 
121
        k.add('text0', [], 'text0')
 
122
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
 
123
                         k.get_sha1('text0'))
 
124
        self.assertRaises(errors.RevisionNotPresent,
 
125
                          k.get_sha1, 0)
 
126
        self.assertRaises(errors.RevisionNotPresent,
 
127
                          k.get_sha1, 'text1')
 
128
                        
 
129
 
 
130
class InvalidAdd(TestBase):
 
131
    """Try to use invalid version number during add."""
 
132
    def runTest(self):
 
133
        k = Weave()
 
134
 
 
135
        self.assertRaises(IndexError,
 
136
                          k.add,
 
137
                          'text0',
 
138
                          [69],
 
139
                          ['new text!'])
 
140
 
 
141
 
 
142
class RepeatedAdd(TestBase):
 
143
    """Add the same version twice; harmless."""
 
144
    def runTest(self):
 
145
        k = Weave()
 
146
        idx = k.add('text0', [], TEXT_0)
 
147
        idx2 = k.add('text0', [], TEXT_0)
 
148
        self.assertEqual(idx, idx2)
 
149
 
 
150
 
 
151
class InvalidRepeatedAdd(TestBase):
 
152
    def runTest(self):
 
153
        k = Weave()
 
154
        idx = k.add('text0', [], TEXT_0)
 
155
        self.assertRaises(errors.RevisionAlreadyPresent,
 
156
                          k.add,
 
157
                          'text0',
 
158
                          [],
 
159
                          ['not the same text'])
 
160
        self.assertRaises(errors.RevisionAlreadyPresent,
 
161
                          k.add,
 
162
                          'text0',
 
163
                          [12],         # not the right parents
 
164
                          TEXT_0)
 
165
        
 
166
 
 
167
class InsertLines(TestBase):
 
168
    """Store a revision that adds one line to the original.
 
169
 
 
170
    Look at the annotations to make sure that the first line is matched
 
171
    and not stored repeatedly."""
 
172
    def runTest(self):
 
173
        k = Weave()
 
174
 
 
175
        k.add('text0', [], ['line 1'])
 
176
        k.add('text1', [0], ['line 1', 'line 2'])
 
177
 
 
178
        self.assertEqual(k.annotate(0),
 
179
                         [(0, 'line 1')])
 
180
 
 
181
        self.assertEqual(k.get(1),
 
182
                         ['line 1',
 
183
                          'line 2'])
 
184
 
 
185
        self.assertEqual(k.annotate(1),
 
186
                         [(0, 'line 1'),
 
187
                          (1, 'line 2')])
 
188
 
 
189
        k.add('text2', [0], ['line 1', 'diverged line'])
 
190
 
 
191
        self.assertEqual(k.annotate(2),
 
192
                         [(0, 'line 1'),
 
193
                          (2, 'diverged line')])
 
194
 
 
195
        text3 = ['line 1', 'middle line', 'line 2']
 
196
        k.add('text3',
 
197
              [0, 1],
 
198
              text3)
 
199
 
 
200
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
 
201
 
 
202
        self.log("k._weave=" + pformat(k._weave))
 
203
 
 
204
        self.assertEqual(k.annotate(3),
 
205
                         [(0, 'line 1'),
 
206
                          (3, 'middle line'),
 
207
                          (1, 'line 2')])
 
208
 
 
209
        # now multiple insertions at different places
 
210
        k.add('text4',
 
211
              [0, 1, 3],
 
212
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
 
213
 
 
214
        self.assertEqual(k.annotate(4), 
 
215
                         [(0, 'line 1'),
 
216
                          (4, 'aaa'),
 
217
                          (3, 'middle line'),
 
218
                          (4, 'bbb'),
 
219
                          (1, 'line 2'),
 
220
                          (4, 'ccc')])
 
221
 
 
222
 
 
223
class DeleteLines(TestBase):
 
224
    """Deletion of lines from existing text.
 
225
 
 
226
    Try various texts all based on a common ancestor."""
 
227
    def runTest(self):
 
228
        k = Weave()
 
229
 
 
230
        base_text = ['one', 'two', 'three', 'four']
 
231
 
 
232
        k.add('text0', [], base_text)
 
233
        
 
234
        texts = [['one', 'two', 'three'],
 
235
                 ['two', 'three', 'four'],
 
236
                 ['one', 'four'],
 
237
                 ['one', 'two', 'three', 'four'],
 
238
                 ]
 
239
 
 
240
        i = 1
 
241
        for t in texts:
 
242
            ver = k.add('text%d' % i,
 
243
                        [0], t)
 
244
            i += 1
 
245
 
 
246
        self.log('final weave:')
 
247
        self.log('k._weave=' + pformat(k._weave))
 
248
 
 
249
        for i in range(len(texts)):
 
250
            self.assertEqual(k.get(i+1),
 
251
                             texts[i])
 
252
 
 
253
 
 
254
class SuicideDelete(TestBase):
 
255
    """Invalid weave which tries to add and delete simultaneously."""
 
256
    def runTest(self):
 
257
        k = Weave()
 
258
 
 
259
        k._parents = [(),
 
260
                ]
 
261
        k._weave = [('{', 0),
 
262
                'first line',
 
263
                ('[', 0),
 
264
                'deleted in 0',
 
265
                (']', 0),
 
266
                ('}', 0),
 
267
                ]
 
268
        ################################### SKIPPED
 
269
        # Weave.get doesn't trap this anymore
 
270
        return 
 
271
 
 
272
        self.assertRaises(WeaveFormatError,
 
273
                          k.get,
 
274
                          0)        
 
275
 
 
276
 
 
277
class CannedDelete(TestBase):
 
278
    """Unpack canned weave with deleted lines."""
 
279
    def runTest(self):
 
280
        k = Weave()
 
281
 
 
282
        k._parents = [(),
 
283
                frozenset([0]),
 
284
                ]
 
285
        k._weave = [('{', 0),
 
286
                'first line',
 
287
                ('[', 1),
 
288
                'line to be deleted',
 
289
                (']', 1),
 
290
                'last line',
 
291
                ('}', 0),
 
292
                ]
 
293
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
294
                  , sha_string('first linelast line')]
 
295
 
 
296
        self.assertEqual(k.get(0),
 
297
                         ['first line',
 
298
                          'line to be deleted',
 
299
                          'last line',
 
300
                          ])
 
301
 
 
302
        self.assertEqual(k.get(1),
 
303
                         ['first line',
 
304
                          'last line',
 
305
                          ])
 
306
 
 
307
 
 
308
class CannedReplacement(TestBase):
 
309
    """Unpack canned weave with deleted lines."""
 
310
    def runTest(self):
 
311
        k = Weave()
 
312
 
 
313
        k._parents = [frozenset(),
 
314
                frozenset([0]),
 
315
                ]
 
316
        k._weave = [('{', 0),
 
317
                'first line',
 
318
                ('[', 1),
 
319
                'line to be deleted',
 
320
                (']', 1),
 
321
                ('{', 1),
 
322
                'replacement line',                
 
323
                ('}', 1),
 
324
                'last line',
 
325
                ('}', 0),
 
326
                ]
 
327
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
328
                  , sha_string('first linereplacement linelast line')]
 
329
 
 
330
        self.assertEqual(k.get(0),
 
331
                         ['first line',
 
332
                          'line to be deleted',
 
333
                          'last line',
 
334
                          ])
 
335
 
 
336
        self.assertEqual(k.get(1),
 
337
                         ['first line',
 
338
                          'replacement line',
 
339
                          'last line',
 
340
                          ])
 
341
 
 
342
 
 
343
class BadWeave(TestBase):
 
344
    """Test that we trap an insert which should not occur."""
 
345
    def runTest(self):
 
346
        k = Weave()
 
347
 
 
348
        k._parents = [frozenset(),
 
349
                ]
 
350
        k._weave = ['bad line',
 
351
                ('{', 0),
 
352
                'foo {',
 
353
                ('{', 1),
 
354
                '  added in version 1',
 
355
                ('{', 2),
 
356
                '  added in v2',
 
357
                ('}', 2),
 
358
                '  also from v1',
 
359
                ('}', 1),
 
360
                '}',
 
361
                ('}', 0)]
 
362
 
 
363
        ################################### SKIPPED
 
364
        # Weave.get doesn't trap this anymore
 
365
        return 
 
366
 
 
367
 
 
368
        self.assertRaises(WeaveFormatError,
 
369
                          k.get,
 
370
                          0)
 
371
 
 
372
 
 
373
class BadInsert(TestBase):
 
374
    """Test that we trap an insert which should not occur."""
 
375
    def runTest(self):
 
376
        k = Weave()
 
377
 
 
378
        k._parents = [frozenset(),
 
379
                frozenset([0]),
 
380
                frozenset([0]),
 
381
                frozenset([0,1,2]),
 
382
                ]
 
383
        k._weave = [('{', 0),
 
384
                'foo {',
 
385
                ('{', 1),
 
386
                '  added in version 1',
 
387
                ('{', 1),
 
388
                '  more in 1',
 
389
                ('}', 1),
 
390
                ('}', 1),
 
391
                ('}', 0)]
 
392
 
 
393
 
 
394
        # this is not currently enforced by get
 
395
        return  ##########################################
 
396
 
 
397
        self.assertRaises(WeaveFormatError,
 
398
                          k.get,
 
399
                          0)
 
400
 
 
401
        self.assertRaises(WeaveFormatError,
 
402
                          k.get,
 
403
                          1)
 
404
 
 
405
 
 
406
class InsertNested(TestBase):
 
407
    """Insertion with nested instructions."""
 
408
    def runTest(self):
 
409
        k = Weave()
 
410
 
 
411
        k._parents = [frozenset(),
 
412
                frozenset([0]),
 
413
                frozenset([0]),
 
414
                frozenset([0,1,2]),
 
415
                ]
 
416
        k._weave = [('{', 0),
 
417
                'foo {',
 
418
                ('{', 1),
 
419
                '  added in version 1',
 
420
                ('{', 2),
 
421
                '  added in v2',
 
422
                ('}', 2),
 
423
                '  also from v1',
 
424
                ('}', 1),
 
425
                '}',
 
426
                ('}', 0)]
 
427
 
 
428
        k._sha1s = [sha_string('foo {}')
 
429
                  , sha_string('foo {  added in version 1  also from v1}')
 
430
                  , sha_string('foo {  added in v2}')
 
431
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
432
                  ]
 
433
 
 
434
        self.assertEqual(k.get(0),
 
435
                         ['foo {',
 
436
                          '}'])
 
437
 
 
438
        self.assertEqual(k.get(1),
 
439
                         ['foo {',
 
440
                          '  added in version 1',
 
441
                          '  also from v1',
 
442
                          '}'])
 
443
                       
 
444
        self.assertEqual(k.get(2),
 
445
                         ['foo {',
 
446
                          '  added in v2',
 
447
                          '}'])
 
448
 
 
449
        self.assertEqual(k.get(3),
 
450
                         ['foo {',
 
451
                          '  added in version 1',
 
452
                          '  added in v2',
 
453
                          '  also from v1',
 
454
                          '}'])
 
455
                         
 
456
 
 
457
class DeleteLines2(TestBase):
 
458
    """Test recording revisions that delete lines.
 
459
 
 
460
    This relies on the weave having a way to represent lines knocked
 
461
    out by a later revision."""
 
462
    def runTest(self):
 
463
        k = Weave()
 
464
 
 
465
        k.add('text0', [], ["line the first",
 
466
                   "line 2",
 
467
                   "line 3",
 
468
                   "fine"])
 
469
 
 
470
        self.assertEqual(len(k.get(0)), 4)
 
471
 
 
472
        k.add('text1', [0], ["line the first",
 
473
                   "fine"])
 
474
 
 
475
        self.assertEqual(k.get(1),
 
476
                         ["line the first",
 
477
                          "fine"])
 
478
 
 
479
        self.assertEqual(k.annotate(1),
 
480
                         [(0, "line the first"),
 
481
                          (0, "fine")])
 
482
 
 
483
 
 
484
class IncludeVersions(TestBase):
 
485
    """Check texts that are stored across multiple revisions.
 
486
 
 
487
    Here we manually create a weave with particular encoding and make
 
488
    sure it unpacks properly.
 
489
 
 
490
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
491
    lines.
 
492
    """
 
493
 
 
494
    def runTest(self):
 
495
        k = Weave()
 
496
 
 
497
        k._parents = [frozenset(), frozenset([0])]
 
498
        k._weave = [('{', 0),
 
499
                "first line",
 
500
                ('}', 0),
 
501
                ('{', 1),
 
502
                "second line",
 
503
                ('}', 1)]
 
504
 
 
505
        k._sha1s = [sha_string('first line')
 
506
                  , sha_string('first linesecond line')]
 
507
 
 
508
        self.assertEqual(k.get(1),
 
509
                         ["first line",
 
510
                          "second line"])
 
511
 
 
512
        self.assertEqual(k.get(0),
 
513
                         ["first line"])
 
514
 
 
515
 
 
516
class DivergedIncludes(TestBase):
 
517
    """Weave with two diverged texts based on version 0.
 
518
    """
 
519
    def runTest(self):
 
520
        # FIXME make the weave, dont poke at it.
 
521
        k = Weave()
 
522
 
 
523
        k._names = ['0', '1', '2']
 
524
        k._parents = [frozenset(),
 
525
                frozenset([0]),
 
526
                frozenset([0]),
 
527
                ]
 
528
        k._weave = [('{', 0),
 
529
                "first line",
 
530
                ('}', 0),
 
531
                ('{', 1),
 
532
                "second line",
 
533
                ('}', 1),
 
534
                ('{', 2),
 
535
                "alternative second line",
 
536
                ('}', 2),                
 
537
                ]
 
538
 
 
539
        k._sha1s = [sha_string('first line')
 
540
                  , sha_string('first linesecond line')
 
541
                  , sha_string('first linealternative second line')]
 
542
 
 
543
        self.assertEqual(k.get(0),
 
544
                         ["first line"])
 
545
 
 
546
        self.assertEqual(k.get(1),
 
547
                         ["first line",
 
548
                          "second line"])
 
549
 
 
550
        self.assertEqual(k.get(2),
 
551
                         ["first line",
 
552
                          "alternative second line"])
 
553
 
 
554
        self.assertEqual(list(k.inclusions([2])),
 
555
                         ['0', '2'])
 
556
 
 
557
 
 
558
class ReplaceLine(TestBase):
 
559
    def runTest(self):
 
560
        k = Weave()
 
561
 
 
562
        text0 = ['cheddar', 'stilton', 'gruyere']
 
563
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
 
564
        
 
565
        k.add('text0', [], text0)
 
566
        k.add('text1', [0], text1)
 
567
 
 
568
        self.log('k._weave=' + pformat(k._weave))
 
569
 
 
570
        self.assertEqual(k.get(0), text0)
 
571
        self.assertEqual(k.get(1), text1)
 
572
 
 
573
 
 
574
class Merge(TestBase):
 
575
    """Storage of versions that merge diverged parents"""
 
576
    def runTest(self):
 
577
        k = Weave()
 
578
 
 
579
        texts = [['header'],
 
580
                 ['header', '', 'line from 1'],
 
581
                 ['header', '', 'line from 2', 'more from 2'],
 
582
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
 
583
                 ]
 
584
 
 
585
        k.add('text0', [], texts[0])
 
586
        k.add('text1', [0], texts[1])
 
587
        k.add('text2', [0], texts[2])
 
588
        k.add('merge', [0, 1, 2], texts[3])
 
589
 
 
590
        for i, t in enumerate(texts):
 
591
            self.assertEqual(k.get(i), t)
 
592
 
 
593
        self.assertEqual(k.annotate(3),
 
594
                         [(0, 'header'),
 
595
                          (1, ''),
 
596
                          (1, 'line from 1'),
 
597
                          (3, 'fixup line'),
 
598
                          (2, 'line from 2'),
 
599
                          ])
 
600
 
 
601
        self.assertEqual(list(k.inclusions([3])),
 
602
                         ['text0', 'text1', 'text2', 'merge'])
 
603
 
 
604
        self.log('k._weave=' + pformat(k._weave))
 
605
 
 
606
        self.check_read_write(k)
 
607
 
 
608
 
 
609
class Conflicts(TestBase):
 
610
    """Test detection of conflicting regions during a merge.
 
611
 
 
612
    A base version is inserted, then two descendents try to
 
613
    insert different lines in the same place.  These should be
 
614
    reported as a possible conflict and forwarded to the user."""
 
615
    def runTest(self):
 
616
        return  # NOT RUN
 
617
        k = Weave()
 
618
 
 
619
        k.add([], ['aaa', 'bbb'])
 
620
        k.add([0], ['aaa', '111', 'bbb'])
 
621
        k.add([1], ['aaa', '222', 'bbb'])
 
622
 
 
623
        merged = k.merge([1, 2])
 
624
 
 
625
        self.assertEquals([[['aaa']],
 
626
                           [['111'], ['222']],
 
627
                           [['bbb']]])
 
628
 
 
629
 
 
630
class NonConflict(TestBase):
 
631
    """Two descendants insert compatible changes.
 
632
 
 
633
    No conflict should be reported."""
 
634
    def runTest(self):
 
635
        return  # NOT RUN
 
636
        k = Weave()
 
637
 
 
638
        k.add([], ['aaa', 'bbb'])
 
639
        k.add([0], ['111', 'aaa', 'ccc', 'bbb'])
 
640
        k.add([1], ['aaa', 'ccc', 'bbb', '222'])
 
641
 
 
642
 
 
643
class Khayyam(TestBase):
 
644
    """Test changes to multi-line texts, and read/write"""
 
645
 
 
646
    def test_multi_line_merge(self):
 
647
        rawtexts = [
 
648
            """A Book of Verses underneath the Bough,
 
649
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
650
            Beside me singing in the Wilderness --
 
651
            Oh, Wilderness were Paradise enow!""",
 
652
            
 
653
            """A Book of Verses underneath the Bough,
 
654
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
655
            Beside me singing in the Wilderness --
 
656
            Oh, Wilderness were Paradise now!""",
 
657
 
 
658
            """A Book of poems underneath the tree,
 
659
            A Jug of Wine, a Loaf of Bread,
 
660
            and Thou
 
661
            Beside me singing in the Wilderness --
 
662
            Oh, Wilderness were Paradise now!
 
663
 
 
664
            -- O. Khayyam""",
 
665
 
 
666
            """A Book of Verses underneath the Bough,
 
667
            A Jug of Wine, a Loaf of Bread,
 
668
            and Thou
 
669
            Beside me singing in the Wilderness --
 
670
            Oh, Wilderness were Paradise now!""",
 
671
            ]
 
672
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
 
673
 
 
674
        k = Weave()
 
675
        parents = set()
 
676
        i = 0
 
677
        for t in texts:
 
678
            ver = k.add('text%d' % i,
 
679
                        list(parents), t)
 
680
            parents.add(ver)
 
681
            i += 1
 
682
 
 
683
        self.log("k._weave=" + pformat(k._weave))
 
684
 
 
685
        for i, t in enumerate(texts):
 
686
            self.assertEqual(k.get(i), t)
 
687
 
 
688
        self.check_read_write(k)
 
689
 
 
690
 
 
691
class MergeCases(TestBase):
 
692
    def doMerge(self, base, a, b, mp):
 
693
        from cStringIO import StringIO
 
694
        from textwrap import dedent
 
695
 
 
696
        def addcrlf(x):
 
697
            return x + '\n'
 
698
        
 
699
        w = Weave()
 
700
        w.add('text0', [], map(addcrlf, base))
 
701
        w.add('text1', [0], map(addcrlf, a))
 
702
        w.add('text2', [0], map(addcrlf, b))
 
703
 
 
704
        self.log('weave is:')
 
705
        tmpf = StringIO()
 
706
        write_weave(w, tmpf)
 
707
        self.log(tmpf.getvalue())
 
708
 
 
709
        self.log('merge plan:')
 
710
        p = list(w.plan_merge(1, 2))
 
711
        for state, line in p:
 
712
            if line:
 
713
                self.log('%12s | %s' % (state, line[:-1]))
 
714
 
 
715
        self.log('merge:')
 
716
        mt = StringIO()
 
717
        mt.writelines(w.weave_merge(p))
 
718
        mt.seek(0)
 
719
        self.log(mt.getvalue())
 
720
 
 
721
        mp = map(addcrlf, mp)
 
722
        self.assertEqual(mt.readlines(), mp)
 
723
        
 
724
        
 
725
    def testOneInsert(self):
 
726
        self.doMerge([],
 
727
                     ['aa'],
 
728
                     [],
 
729
                     ['aa'])
 
730
 
 
731
    def testSeparateInserts(self):
 
732
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
733
                     ['aaa', 'xxx', 'bbb', 'ccc'],
 
734
                     ['aaa', 'bbb', 'yyy', 'ccc'],
 
735
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
 
736
 
 
737
    def testSameInsert(self):
 
738
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
739
                     ['aaa', 'xxx', 'bbb', 'ccc'],
 
740
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
 
741
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
 
742
 
 
743
    def testOverlappedInsert(self):
 
744
        self.doMerge(['aaa', 'bbb'],
 
745
                     ['aaa', 'xxx', 'yyy', 'bbb'],
 
746
                     ['aaa', 'xxx', 'bbb'],
 
747
                     ['aaa', '<<<<<<< ', 'xxx', 'yyy', '=======', 'xxx', 
 
748
                      '>>>>>>> ', 'bbb'])
 
749
 
 
750
        # really it ought to reduce this to 
 
751
        # ['aaa', 'xxx', 'yyy', 'bbb']
 
752
 
 
753
 
 
754
    def testClashReplace(self):
 
755
        self.doMerge(['aaa'],
 
756
                     ['xxx'],
 
757
                     ['yyy', 'zzz'],
 
758
                     ['<<<<<<< ', 'xxx', '=======', 'yyy', 'zzz', 
 
759
                      '>>>>>>> '])
 
760
 
 
761
    def testNonClashInsert(self):
 
762
        self.doMerge(['aaa'],
 
763
                     ['xxx', 'aaa'],
 
764
                     ['yyy', 'zzz'],
 
765
                     ['<<<<<<< ', 'xxx', 'aaa', '=======', 'yyy', 'zzz', 
 
766
                      '>>>>>>> '])
 
767
 
 
768
        self.doMerge(['aaa'],
 
769
                     ['aaa'],
 
770
                     ['yyy', 'zzz'],
 
771
                     ['yyy', 'zzz'])
 
772
 
 
773
 
 
774
    def testDeleteAndModify(self):
 
775
        """Clashing delete and modification.
 
776
 
 
777
        If one side modifies a region and the other deletes it then
 
778
        there should be a conflict with one side blank.
 
779
        """
 
780
 
 
781
        #######################################
 
782
        # skippd, not working yet
 
783
        return
 
784
        
 
785
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
786
                     ['aaa', 'ddd', 'ccc'],
 
787
                     ['aaa', 'ccc'],
 
788
                     ['<<<<<<<< ', 'aaa', '=======', '>>>>>>> ', 'ccc'])
 
789
 
 
790
 
 
791
class JoinWeavesTests(TestBase):
 
792
    def setUp(self):
 
793
        super(JoinWeavesTests, self).setUp()
 
794
        self.weave1 = Weave()
 
795
        self.lines1 = ['hello\n']
 
796
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
 
797
        self.weave1.add('v1', [], self.lines1)
 
798
        self.weave1.add('v2', [0], ['hello\n', 'world\n'])
 
799
        self.weave1.add('v3', [1], self.lines3)
 
800
        
 
801
    def test_join_empty(self):
 
802
        """Join two empty weaves."""
 
803
        eq = self.assertEqual
 
804
        w1 = Weave()
 
805
        w2 = Weave()
 
806
        w1.join(w2)
 
807
        eq(w1.numversions(), 0)
 
808
        
 
809
    def test_join_empty_to_nonempty(self):
 
810
        """Join empty weave onto nonempty."""
 
811
        self.weave1.join(Weave())
 
812
        self.assertEqual(len(self.weave1), 3)
 
813
 
 
814
    def test_join_unrelated(self):
 
815
        """Join two weaves with no history in common."""
 
816
        wb = Weave()
 
817
        wb.add('b1', [], ['line from b\n'])
 
818
        w1 = self.weave1
 
819
        w1.join(wb)
 
820
        eq = self.assertEqual
 
821
        eq(len(w1), 4)
 
822
        eq(sorted(list(w1.iter_names())),
 
823
           ['b1', 'v1', 'v2', 'v3'])
 
824
 
 
825
    def test_join_related(self):
 
826
        wa = self.weave1.copy()
 
827
        wb = self.weave1.copy()
 
828
        wa.add('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
 
829
        wb.add('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
 
830
        eq = self.assertEquals
 
831
        eq(len(wa), 4)
 
832
        eq(len(wb), 4)
 
833
        wa.join(wb)
 
834
        eq(len(wa), 5)
 
835
        eq(wa.get_lines('b1'),
 
836
           ['hello\n', 'pale blue\n', 'world\n'])
 
837
 
 
838
    def test_join_parent_disagreement(self):
 
839
        #join reconciles differening parents into a union.
 
840
        wa = Weave()
 
841
        wb = Weave()
 
842
        wa.add('v1', [], ['hello\n'])
 
843
        wb.add('v0', [], [])
 
844
        wb.add('v1', ['v0'], ['hello\n'])
 
845
        wa.join(wb)
 
846
        self.assertEqual(['v0'], wa.get_parents('v1'))
 
847
 
 
848
    def test_join_text_disagreement(self):
 
849
        """Cannot join weaves with different texts for a version."""
 
850
        wa = Weave()
 
851
        wb = Weave()
 
852
        wa.add('v1', [], ['hello\n'])
 
853
        wb.add('v1', [], ['not\n', 'hello\n'])
 
854
        self.assertRaises(WeaveError,
 
855
                          wa.join, wb)
 
856
 
 
857
    def test_join_unordered(self):
 
858
        """Join weaves where indexes differ.
 
859
        
 
860
        The source weave contains a different version at index 0."""
 
861
        wa = self.weave1.copy()
 
862
        wb = Weave()
 
863
        wb.add('x1', [], ['line from x1\n'])
 
864
        wb.add('v1', [], ['hello\n'])
 
865
        wb.add('v2', ['v1'], ['hello\n', 'world\n'])
 
866
        wa.join(wb)
 
867
        eq = self.assertEquals
 
868
        eq(sorted(wa.iter_names()), ['v1', 'v2', 'v3', 'x1',])
 
869
        eq(wa.get_text('x1'), 'line from x1\n')
 
870
 
 
871
    def test_written_detection(self):
 
872
        # Test detection of weave file corruption.
 
873
        #
 
874
        # Make sure that we can detect if a weave file has
 
875
        # been corrupted. This doesn't test all forms of corruption,
 
876
        # but it at least helps verify the data you get, is what you want.
 
877
        from cStringIO import StringIO
 
878
 
 
879
        w = Weave()
 
880
        w.add('v1', [], ['hello\n'])
 
881
        w.add('v2', ['v1'], ['hello\n', 'there\n'])
 
882
 
 
883
        tmpf = StringIO()
 
884
        write_weave(w, tmpf)
 
885
 
 
886
        # Because we are corrupting, we need to make sure we have the exact text
 
887
        self.assertEquals('# bzr weave file v5\n'
 
888
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
889
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
890
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
891
                          tmpf.getvalue())
 
892
 
 
893
        # Change a single letter
 
894
        tmpf = StringIO('# bzr weave file v5\n'
 
895
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
896
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
897
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
898
 
 
899
        w = read_weave(tmpf)
 
900
 
 
901
        self.assertEqual('hello\n', w.get_text('v1'))
 
902
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
903
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
904
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
905
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
906
 
 
907
        # Change the sha checksum
 
908
        tmpf = StringIO('# bzr weave file v5\n'
 
909
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
910
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
911
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
912
 
 
913
        w = read_weave(tmpf)
 
914
 
 
915
        self.assertEqual('hello\n', w.get_text('v1'))
 
916
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
917
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
918
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
919
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
920
 
 
921
 
 
922
class InstrumentedWeave(Weave):
 
923
    """Keep track of how many times functions are called."""
 
924
    
 
925
    def __init__(self, weave_name=None):
 
926
        self._extract_count = 0
 
927
        Weave.__init__(self, weave_name=weave_name)
 
928
 
 
929
    def _extract(self, versions):
 
930
        self._extract_count += 1
 
931
        return Weave._extract(self, versions)
 
932
 
 
933
 
 
934
class JoinOptimization(TestCase):
 
935
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
 
936
 
 
937
    def test_join(self):
 
938
        w1 = InstrumentedWeave()
 
939
        w2 = InstrumentedWeave()
 
940
 
 
941
        txt0 = ['a\n']
 
942
        txt1 = ['a\n', 'b\n']
 
943
        txt2 = ['a\n', 'c\n']
 
944
        txt3 = ['a\n', 'b\n', 'c\n']
 
945
 
 
946
        w1.add('txt0', [], txt0) # extract 1a
 
947
        w2.add('txt0', [], txt0) # extract 1b
 
948
        w1.add('txt1', [0], txt1)# extract 2a
 
949
        w2.add('txt2', [0], txt2)# extract 2b
 
950
        w1.join(w2) # extract 3a to add txt2 
 
951
        w2.join(w1) # extract 3b to add txt1 
 
952
 
 
953
        w1.add('txt3', [1, 2], txt3) # extract 4a 
 
954
        w2.add('txt3', [1, 2], txt3) # extract 4b
 
955
        # These secretly have inverted parents
 
956
 
 
957
        # This should not have to do any extractions
 
958
        w1.join(w2) # NO extract, texts already present with same parents
 
959
        w2.join(w1) # NO extract, texts already present with same parents
 
960
 
 
961
        self.assertEqual(4, w1._extract_count)
 
962
        self.assertEqual(4, w2._extract_count)
 
963
 
 
964
    def test_double_parent(self):
 
965
        # It should not be considered illegal to add
 
966
        # a revision with the same parent twice
 
967
        w1 = InstrumentedWeave()
 
968
        w2 = InstrumentedWeave()
 
969
 
 
970
        txt0 = ['a\n']
 
971
        txt1 = ['a\n', 'b\n']
 
972
        txt2 = ['a\n', 'c\n']
 
973
        txt3 = ['a\n', 'b\n', 'c\n']
 
974
 
 
975
        w1.add('txt0', [], txt0)
 
976
        w2.add('txt0', [], txt0)
 
977
        w1.add('txt1', [0], txt1)
 
978
        w2.add('txt1', [0,0], txt1)
 
979
        # Same text, effectively the same, because the
 
980
        # parent is only repeated
 
981
        w1.join(w2) # extract 3a to add txt2 
 
982
        w2.join(w1) # extract 3b to add txt1 
 
983
 
 
984
 
 
985
class MismatchedTexts(TestCase):
 
986
    """Test that merging two weaves with different texts fails."""
 
987
 
 
988
    def test_reweave(self):
 
989
        w1 = Weave('a')
 
990
        w2 = Weave('b')
 
991
 
 
992
        w1.add('txt0', [], ['a\n'])
 
993
        w2.add('txt0', [], ['a\n'])
 
994
        w1.add('txt1', [0], ['a\n', 'b\n'])
 
995
        w2.add('txt1', [0], ['a\n', 'c\n'])
 
996
 
 
997
        self.assertRaises(errors.WeaveTextDiffers, w1.reweave, w2)
 
998
 
 
999
 
 
1000
class TestNeedsRweave(TestCase):
 
1001
    """Internal corner cases for when reweave is needed."""
 
1002
 
 
1003
    def test_compatible_parents(self):
 
1004
        w1 = Weave('a')
 
1005
        my_parents = set([1, 2, 3])
 
1006
        # subsets are ok
 
1007
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
 
1008
        # same sets
 
1009
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
1010
        # same empty corner case
 
1011
        self.assertTrue(w1._compatible_parents(set(), set()))
 
1012
        # other cannot contain stuff my_parents does not
 
1013
        self.assertFalse(w1._compatible_parents(set(), set([1])))
 
1014
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
 
1015
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
 
1016
        
 
1017