/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Vincent Ladeuil
  • Date: 2008-05-09 16:40:21 UTC
  • mto: This revision was merged to the branch mainline in revision 3422.
  • Revision ID: v.ladeuil+lp@free.fr-20080509164021-kxtz21ozxnv16ivt
Fixed as per John's review.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: tests regarding version names
 
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
 
20
#       if it fails.
 
21
 
 
22
"""test suite for weave algorithm"""
 
23
 
 
24
from pprint import pformat
 
25
 
 
26
from bzrlib import (
 
27
    errors,
 
28
    )
 
29
from bzrlib.osutils import sha_string
 
30
from bzrlib.tests import TestCase, TestCaseInTempDir
 
31
from bzrlib.weave import Weave, WeaveFormatError, WeaveError
 
32
from bzrlib.weavefile import write_weave, read_weave
 
33
 
 
34
 
 
35
# texts for use in testing
 
36
TEXT_0 = ["Hello world"]
 
37
TEXT_1 = ["Hello world",
 
38
          "A second line"]
 
39
 
 
40
 
 
41
class TestBase(TestCase):
 
42
 
 
43
    def check_read_write(self, k):
 
44
        """Check the weave k can be written & re-read."""
 
45
        from tempfile import TemporaryFile
 
46
        tf = TemporaryFile()
 
47
 
 
48
        write_weave(k, tf)
 
49
        tf.seek(0)
 
50
        k2 = read_weave(tf)
 
51
 
 
52
        if k != k2:
 
53
            tf.seek(0)
 
54
            self.log('serialized weave:')
 
55
            self.log(tf.read())
 
56
 
 
57
            self.log('')
 
58
            self.log('parents: %s' % (k._parents == k2._parents))
 
59
            self.log('         %r' % k._parents)
 
60
            self.log('         %r' % k2._parents)
 
61
            self.log('')
 
62
            self.fail('read/write check failed')
 
63
 
 
64
 
 
65
class WeaveContains(TestBase):
 
66
    """Weave __contains__ operator"""
 
67
    def runTest(self):
 
68
        k = Weave(get_scope=lambda:None)
 
69
        self.assertFalse('foo' in k)
 
70
        k.add_lines('foo', [], TEXT_1)
 
71
        self.assertTrue('foo' in k)
 
72
 
 
73
 
 
74
class Easy(TestBase):
 
75
    def runTest(self):
 
76
        k = Weave()
 
77
 
 
78
 
 
79
class AnnotateOne(TestBase):
 
80
    def runTest(self):
 
81
        k = Weave()
 
82
        k.add_lines('text0', [], TEXT_0)
 
83
        self.assertEqual(k.annotate('text0'),
 
84
                         [('text0', TEXT_0[0])])
 
85
 
 
86
 
 
87
class InvalidAdd(TestBase):
 
88
    """Try to use invalid version number during add."""
 
89
    def runTest(self):
 
90
        k = Weave()
 
91
 
 
92
        self.assertRaises(errors.RevisionNotPresent,
 
93
                          k.add_lines,
 
94
                          'text0',
 
95
                          ['69'],
 
96
                          ['new text!'])
 
97
 
 
98
 
 
99
class RepeatedAdd(TestBase):
 
100
    """Add the same version twice; harmless."""
 
101
 
 
102
    def test_duplicate_add(self):
 
103
        k = Weave()
 
104
        idx = k.add_lines('text0', [], TEXT_0)
 
105
        idx2 = k.add_lines('text0', [], TEXT_0)
 
106
        self.assertEqual(idx, idx2)
 
107
 
 
108
 
 
109
class InvalidRepeatedAdd(TestBase):
 
110
    def runTest(self):
 
111
        k = Weave()
 
112
        k.add_lines('basis', [], TEXT_0)
 
113
        idx = k.add_lines('text0', [], TEXT_0)
 
114
        self.assertRaises(errors.RevisionAlreadyPresent,
 
115
                          k.add_lines,
 
116
                          'text0',
 
117
                          [],
 
118
                          ['not the same text'])
 
119
        self.assertRaises(errors.RevisionAlreadyPresent,
 
120
                          k.add_lines,
 
121
                          'text0',
 
122
                          ['basis'],         # not the right parents
 
123
                          TEXT_0)
 
124
        
 
125
 
 
126
class InsertLines(TestBase):
 
127
    """Store a revision that adds one line to the original.
 
128
 
 
129
    Look at the annotations to make sure that the first line is matched
 
130
    and not stored repeatedly."""
 
131
    def runTest(self):
 
132
        k = Weave()
 
133
 
 
134
        k.add_lines('text0', [], ['line 1'])
 
135
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
 
136
 
 
137
        self.assertEqual(k.annotate('text0'),
 
138
                         [('text0', 'line 1')])
 
139
 
 
140
        self.assertEqual(k.get_lines(1),
 
141
                         ['line 1',
 
142
                          'line 2'])
 
143
 
 
144
        self.assertEqual(k.annotate('text1'),
 
145
                         [('text0', 'line 1'),
 
146
                          ('text1', 'line 2')])
 
147
 
 
148
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
 
149
 
 
150
        self.assertEqual(k.annotate('text2'),
 
151
                         [('text0', 'line 1'),
 
152
                          ('text2', 'diverged line')])
 
153
 
 
154
        text3 = ['line 1', 'middle line', 'line 2']
 
155
        k.add_lines('text3',
 
156
              ['text0', 'text1'],
 
157
              text3)
 
158
 
 
159
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
 
160
 
 
161
        self.log("k._weave=" + pformat(k._weave))
 
162
 
 
163
        self.assertEqual(k.annotate('text3'),
 
164
                         [('text0', 'line 1'),
 
165
                          ('text3', 'middle line'),
 
166
                          ('text1', 'line 2')])
 
167
 
 
168
        # now multiple insertions at different places
 
169
        k.add_lines('text4',
 
170
              ['text0', 'text1', 'text3'],
 
171
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
 
172
 
 
173
        self.assertEqual(k.annotate('text4'), 
 
174
                         [('text0', 'line 1'),
 
175
                          ('text4', 'aaa'),
 
176
                          ('text3', 'middle line'),
 
177
                          ('text4', 'bbb'),
 
178
                          ('text1', 'line 2'),
 
179
                          ('text4', 'ccc')])
 
180
 
 
181
 
 
182
class DeleteLines(TestBase):
 
183
    """Deletion of lines from existing text.
 
184
 
 
185
    Try various texts all based on a common ancestor."""
 
186
    def runTest(self):
 
187
        k = Weave()
 
188
 
 
189
        base_text = ['one', 'two', 'three', 'four']
 
190
 
 
191
        k.add_lines('text0', [], base_text)
 
192
        
 
193
        texts = [['one', 'two', 'three'],
 
194
                 ['two', 'three', 'four'],
 
195
                 ['one', 'four'],
 
196
                 ['one', 'two', 'three', 'four'],
 
197
                 ]
 
198
 
 
199
        i = 1
 
200
        for t in texts:
 
201
            ver = k.add_lines('text%d' % i,
 
202
                        ['text0'], t)
 
203
            i += 1
 
204
 
 
205
        self.log('final weave:')
 
206
        self.log('k._weave=' + pformat(k._weave))
 
207
 
 
208
        for i in range(len(texts)):
 
209
            self.assertEqual(k.get_lines(i+1),
 
210
                             texts[i])
 
211
 
 
212
 
 
213
class SuicideDelete(TestBase):
 
214
    """Invalid weave which tries to add and delete simultaneously."""
 
215
    def runTest(self):
 
216
        k = Weave()
 
217
 
 
218
        k._parents = [(),
 
219
                ]
 
220
        k._weave = [('{', 0),
 
221
                'first line',
 
222
                ('[', 0),
 
223
                'deleted in 0',
 
224
                (']', 0),
 
225
                ('}', 0),
 
226
                ]
 
227
        ################################### SKIPPED
 
228
        # Weave.get doesn't trap this anymore
 
229
        return 
 
230
 
 
231
        self.assertRaises(WeaveFormatError,
 
232
                          k.get_lines,
 
233
                          0)        
 
234
 
 
235
 
 
236
class CannedDelete(TestBase):
 
237
    """Unpack canned weave with deleted lines."""
 
238
    def runTest(self):
 
239
        k = Weave()
 
240
 
 
241
        k._parents = [(),
 
242
                frozenset([0]),
 
243
                ]
 
244
        k._weave = [('{', 0),
 
245
                'first line',
 
246
                ('[', 1),
 
247
                'line to be deleted',
 
248
                (']', 1),
 
249
                'last line',
 
250
                ('}', 0),
 
251
                ]
 
252
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
253
                  , sha_string('first linelast line')]
 
254
 
 
255
        self.assertEqual(k.get_lines(0),
 
256
                         ['first line',
 
257
                          'line to be deleted',
 
258
                          'last line',
 
259
                          ])
 
260
 
 
261
        self.assertEqual(k.get_lines(1),
 
262
                         ['first line',
 
263
                          'last line',
 
264
                          ])
 
265
 
 
266
 
 
267
class CannedReplacement(TestBase):
 
268
    """Unpack canned weave with deleted lines."""
 
269
    def runTest(self):
 
270
        k = Weave()
 
271
 
 
272
        k._parents = [frozenset(),
 
273
                frozenset([0]),
 
274
                ]
 
275
        k._weave = [('{', 0),
 
276
                'first line',
 
277
                ('[', 1),
 
278
                'line to be deleted',
 
279
                (']', 1),
 
280
                ('{', 1),
 
281
                'replacement line',                
 
282
                ('}', 1),
 
283
                'last line',
 
284
                ('}', 0),
 
285
                ]
 
286
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
287
                  , sha_string('first linereplacement linelast line')]
 
288
 
 
289
        self.assertEqual(k.get_lines(0),
 
290
                         ['first line',
 
291
                          'line to be deleted',
 
292
                          'last line',
 
293
                          ])
 
294
 
 
295
        self.assertEqual(k.get_lines(1),
 
296
                         ['first line',
 
297
                          'replacement line',
 
298
                          'last line',
 
299
                          ])
 
300
 
 
301
 
 
302
class BadWeave(TestBase):
 
303
    """Test that we trap an insert which should not occur."""
 
304
    def runTest(self):
 
305
        k = Weave()
 
306
 
 
307
        k._parents = [frozenset(),
 
308
                ]
 
309
        k._weave = ['bad line',
 
310
                ('{', 0),
 
311
                'foo {',
 
312
                ('{', 1),
 
313
                '  added in version 1',
 
314
                ('{', 2),
 
315
                '  added in v2',
 
316
                ('}', 2),
 
317
                '  also from v1',
 
318
                ('}', 1),
 
319
                '}',
 
320
                ('}', 0)]
 
321
 
 
322
        ################################### SKIPPED
 
323
        # Weave.get doesn't trap this anymore
 
324
        return 
 
325
 
 
326
 
 
327
        self.assertRaises(WeaveFormatError,
 
328
                          k.get,
 
329
                          0)
 
330
 
 
331
 
 
332
class BadInsert(TestBase):
 
333
    """Test that we trap an insert which should not occur."""
 
334
    def runTest(self):
 
335
        k = Weave()
 
336
 
 
337
        k._parents = [frozenset(),
 
338
                frozenset([0]),
 
339
                frozenset([0]),
 
340
                frozenset([0,1,2]),
 
341
                ]
 
342
        k._weave = [('{', 0),
 
343
                'foo {',
 
344
                ('{', 1),
 
345
                '  added in version 1',
 
346
                ('{', 1),
 
347
                '  more in 1',
 
348
                ('}', 1),
 
349
                ('}', 1),
 
350
                ('}', 0)]
 
351
 
 
352
 
 
353
        # this is not currently enforced by get
 
354
        return  ##########################################
 
355
 
 
356
        self.assertRaises(WeaveFormatError,
 
357
                          k.get,
 
358
                          0)
 
359
 
 
360
        self.assertRaises(WeaveFormatError,
 
361
                          k.get,
 
362
                          1)
 
363
 
 
364
 
 
365
class InsertNested(TestBase):
 
366
    """Insertion with nested instructions."""
 
367
    def runTest(self):
 
368
        k = Weave()
 
369
 
 
370
        k._parents = [frozenset(),
 
371
                frozenset([0]),
 
372
                frozenset([0]),
 
373
                frozenset([0,1,2]),
 
374
                ]
 
375
        k._weave = [('{', 0),
 
376
                'foo {',
 
377
                ('{', 1),
 
378
                '  added in version 1',
 
379
                ('{', 2),
 
380
                '  added in v2',
 
381
                ('}', 2),
 
382
                '  also from v1',
 
383
                ('}', 1),
 
384
                '}',
 
385
                ('}', 0)]
 
386
 
 
387
        k._sha1s = [sha_string('foo {}')
 
388
                  , sha_string('foo {  added in version 1  also from v1}')
 
389
                  , sha_string('foo {  added in v2}')
 
390
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
391
                  ]
 
392
 
 
393
        self.assertEqual(k.get_lines(0),
 
394
                         ['foo {',
 
395
                          '}'])
 
396
 
 
397
        self.assertEqual(k.get_lines(1),
 
398
                         ['foo {',
 
399
                          '  added in version 1',
 
400
                          '  also from v1',
 
401
                          '}'])
 
402
                       
 
403
        self.assertEqual(k.get_lines(2),
 
404
                         ['foo {',
 
405
                          '  added in v2',
 
406
                          '}'])
 
407
 
 
408
        self.assertEqual(k.get_lines(3),
 
409
                         ['foo {',
 
410
                          '  added in version 1',
 
411
                          '  added in v2',
 
412
                          '  also from v1',
 
413
                          '}'])
 
414
                         
 
415
 
 
416
class DeleteLines2(TestBase):
 
417
    """Test recording revisions that delete lines.
 
418
 
 
419
    This relies on the weave having a way to represent lines knocked
 
420
    out by a later revision."""
 
421
    def runTest(self):
 
422
        k = Weave()
 
423
 
 
424
        k.add_lines('text0', [], ["line the first",
 
425
                   "line 2",
 
426
                   "line 3",
 
427
                   "fine"])
 
428
 
 
429
        self.assertEqual(len(k.get_lines(0)), 4)
 
430
 
 
431
        k.add_lines('text1', ['text0'], ["line the first",
 
432
                   "fine"])
 
433
 
 
434
        self.assertEqual(k.get_lines(1),
 
435
                         ["line the first",
 
436
                          "fine"])
 
437
 
 
438
        self.assertEqual(k.annotate('text1'),
 
439
                         [('text0', "line the first"),
 
440
                          ('text0', "fine")])
 
441
 
 
442
 
 
443
class IncludeVersions(TestBase):
 
444
    """Check texts that are stored across multiple revisions.
 
445
 
 
446
    Here we manually create a weave with particular encoding and make
 
447
    sure it unpacks properly.
 
448
 
 
449
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
450
    lines.
 
451
    """
 
452
 
 
453
    def runTest(self):
 
454
        k = Weave()
 
455
 
 
456
        k._parents = [frozenset(), frozenset([0])]
 
457
        k._weave = [('{', 0),
 
458
                "first line",
 
459
                ('}', 0),
 
460
                ('{', 1),
 
461
                "second line",
 
462
                ('}', 1)]
 
463
 
 
464
        k._sha1s = [sha_string('first line')
 
465
                  , sha_string('first linesecond line')]
 
466
 
 
467
        self.assertEqual(k.get_lines(1),
 
468
                         ["first line",
 
469
                          "second line"])
 
470
 
 
471
        self.assertEqual(k.get_lines(0),
 
472
                         ["first line"])
 
473
 
 
474
 
 
475
class DivergedIncludes(TestBase):
 
476
    """Weave with two diverged texts based on version 0.
 
477
    """
 
478
    def runTest(self):
 
479
        # FIXME make the weave, dont poke at it.
 
480
        k = Weave()
 
481
 
 
482
        k._names = ['0', '1', '2']
 
483
        k._name_map = {'0':0, '1':1, '2':2}
 
484
        k._parents = [frozenset(),
 
485
                frozenset([0]),
 
486
                frozenset([0]),
 
487
                ]
 
488
        k._weave = [('{', 0),
 
489
                "first line",
 
490
                ('}', 0),
 
491
                ('{', 1),
 
492
                "second line",
 
493
                ('}', 1),
 
494
                ('{', 2),
 
495
                "alternative second line",
 
496
                ('}', 2),                
 
497
                ]
 
498
 
 
499
        k._sha1s = [sha_string('first line')
 
500
                  , sha_string('first linesecond line')
 
501
                  , sha_string('first linealternative second line')]
 
502
 
 
503
        self.assertEqual(k.get_lines(0),
 
504
                         ["first line"])
 
505
 
 
506
        self.assertEqual(k.get_lines(1),
 
507
                         ["first line",
 
508
                          "second line"])
 
509
 
 
510
        self.assertEqual(k.get_lines('2'),
 
511
                         ["first line",
 
512
                          "alternative second line"])
 
513
 
 
514
        self.assertEqual(list(k.get_ancestry(['2'])),
 
515
                         ['0', '2'])
 
516
 
 
517
 
 
518
class ReplaceLine(TestBase):
 
519
    def runTest(self):
 
520
        k = Weave()
 
521
 
 
522
        text0 = ['cheddar', 'stilton', 'gruyere']
 
523
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
 
524
        
 
525
        k.add_lines('text0', [], text0)
 
526
        k.add_lines('text1', ['text0'], text1)
 
527
 
 
528
        self.log('k._weave=' + pformat(k._weave))
 
529
 
 
530
        self.assertEqual(k.get_lines(0), text0)
 
531
        self.assertEqual(k.get_lines(1), text1)
 
532
 
 
533
 
 
534
class Merge(TestBase):
 
535
    """Storage of versions that merge diverged parents"""
 
536
    def runTest(self):
 
537
        k = Weave()
 
538
 
 
539
        texts = [['header'],
 
540
                 ['header', '', 'line from 1'],
 
541
                 ['header', '', 'line from 2', 'more from 2'],
 
542
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
 
543
                 ]
 
544
 
 
545
        k.add_lines('text0', [], texts[0])
 
546
        k.add_lines('text1', ['text0'], texts[1])
 
547
        k.add_lines('text2', ['text0'], texts[2])
 
548
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
 
549
 
 
550
        for i, t in enumerate(texts):
 
551
            self.assertEqual(k.get_lines(i), t)
 
552
 
 
553
        self.assertEqual(k.annotate('merge'),
 
554
                         [('text0', 'header'),
 
555
                          ('text1', ''),
 
556
                          ('text1', 'line from 1'),
 
557
                          ('merge', 'fixup line'),
 
558
                          ('text2', 'line from 2'),
 
559
                          ])
 
560
 
 
561
        self.assertEqual(list(k.get_ancestry(['merge'])),
 
562
                         ['text0', 'text1', 'text2', 'merge'])
 
563
 
 
564
        self.log('k._weave=' + pformat(k._weave))
 
565
 
 
566
        self.check_read_write(k)
 
567
 
 
568
 
 
569
class Conflicts(TestBase):
 
570
    """Test detection of conflicting regions during a merge.
 
571
 
 
572
    A base version is inserted, then two descendents try to
 
573
    insert different lines in the same place.  These should be
 
574
    reported as a possible conflict and forwarded to the user."""
 
575
    def runTest(self):
 
576
        return  # NOT RUN
 
577
        k = Weave()
 
578
 
 
579
        k.add_lines([], ['aaa', 'bbb'])
 
580
        k.add_lines([0], ['aaa', '111', 'bbb'])
 
581
        k.add_lines([1], ['aaa', '222', 'bbb'])
 
582
 
 
583
        merged = k.merge([1, 2])
 
584
 
 
585
        self.assertEquals([[['aaa']],
 
586
                           [['111'], ['222']],
 
587
                           [['bbb']]])
 
588
 
 
589
 
 
590
class NonConflict(TestBase):
 
591
    """Two descendants insert compatible changes.
 
592
 
 
593
    No conflict should be reported."""
 
594
    def runTest(self):
 
595
        return  # NOT RUN
 
596
        k = Weave()
 
597
 
 
598
        k.add_lines([], ['aaa', 'bbb'])
 
599
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
 
600
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
 
601
 
 
602
 
 
603
class Khayyam(TestBase):
 
604
    """Test changes to multi-line texts, and read/write"""
 
605
 
 
606
    def test_multi_line_merge(self):
 
607
        rawtexts = [
 
608
            """A Book of Verses underneath the Bough,
 
609
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
610
            Beside me singing in the Wilderness --
 
611
            Oh, Wilderness were Paradise enow!""",
 
612
            
 
613
            """A Book of Verses underneath the Bough,
 
614
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
615
            Beside me singing in the Wilderness --
 
616
            Oh, Wilderness were Paradise now!""",
 
617
 
 
618
            """A Book of poems underneath the tree,
 
619
            A Jug of Wine, a Loaf of Bread,
 
620
            and Thou
 
621
            Beside me singing in the Wilderness --
 
622
            Oh, Wilderness were Paradise now!
 
623
 
 
624
            -- O. Khayyam""",
 
625
 
 
626
            """A Book of Verses underneath the Bough,
 
627
            A Jug of Wine, a Loaf of Bread,
 
628
            and Thou
 
629
            Beside me singing in the Wilderness --
 
630
            Oh, Wilderness were Paradise now!""",
 
631
            ]
 
632
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
 
633
 
 
634
        k = Weave()
 
635
        parents = set()
 
636
        i = 0
 
637
        for t in texts:
 
638
            ver = k.add_lines('text%d' % i,
 
639
                        list(parents), t)
 
640
            parents.add('text%d' % i)
 
641
            i += 1
 
642
 
 
643
        self.log("k._weave=" + pformat(k._weave))
 
644
 
 
645
        for i, t in enumerate(texts):
 
646
            self.assertEqual(k.get_lines(i), t)
 
647
 
 
648
        self.check_read_write(k)
 
649
 
 
650
 
 
651
class JoinWeavesTests(TestBase):
 
652
    def setUp(self):
 
653
        super(JoinWeavesTests, self).setUp()
 
654
        self.weave1 = Weave()
 
655
        self.lines1 = ['hello\n']
 
656
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
 
657
        self.weave1.add_lines('v1', [], self.lines1)
 
658
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
659
        self.weave1.add_lines('v3', ['v2'], self.lines3)
 
660
        
 
661
    def test_join_empty(self):
 
662
        """Join two empty weaves."""
 
663
        eq = self.assertEqual
 
664
        w1 = Weave()
 
665
        w2 = Weave()
 
666
        w1.join(w2)
 
667
        eq(len(w1), 0)
 
668
        
 
669
    def test_join_empty_to_nonempty(self):
 
670
        """Join empty weave onto nonempty."""
 
671
        self.weave1.join(Weave())
 
672
        self.assertEqual(len(self.weave1), 3)
 
673
 
 
674
    def test_join_unrelated(self):
 
675
        """Join two weaves with no history in common."""
 
676
        wb = Weave()
 
677
        wb.add_lines('b1', [], ['line from b\n'])
 
678
        w1 = self.weave1
 
679
        w1.join(wb)
 
680
        eq = self.assertEqual
 
681
        eq(len(w1), 4)
 
682
        eq(sorted(w1.versions()),
 
683
           ['b1', 'v1', 'v2', 'v3'])
 
684
 
 
685
    def test_join_related(self):
 
686
        wa = self.weave1.copy()
 
687
        wb = self.weave1.copy()
 
688
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
 
689
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
 
690
        eq = self.assertEquals
 
691
        eq(len(wa), 4)
 
692
        eq(len(wb), 4)
 
693
        wa.join(wb)
 
694
        eq(len(wa), 5)
 
695
        eq(wa.get_lines('b1'),
 
696
           ['hello\n', 'pale blue\n', 'world\n'])
 
697
 
 
698
    def test_join_text_disagreement(self):
 
699
        """Cannot join weaves with different texts for a version."""
 
700
        wa = Weave()
 
701
        wb = Weave()
 
702
        wa.add_lines('v1', [], ['hello\n'])
 
703
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
 
704
        self.assertRaises(WeaveError,
 
705
                          wa.join, wb)
 
706
 
 
707
    def test_join_unordered(self):
 
708
        """Join weaves where indexes differ.
 
709
        
 
710
        The source weave contains a different version at index 0."""
 
711
        wa = self.weave1.copy()
 
712
        wb = Weave()
 
713
        wb.add_lines('x1', [], ['line from x1\n'])
 
714
        wb.add_lines('v1', [], ['hello\n'])
 
715
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
716
        wa.join(wb)
 
717
        eq = self.assertEquals
 
718
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
 
719
        eq(wa.get_text('x1'), 'line from x1\n')
 
720
 
 
721
    def test_written_detection(self):
 
722
        # Test detection of weave file corruption.
 
723
        #
 
724
        # Make sure that we can detect if a weave file has
 
725
        # been corrupted. This doesn't test all forms of corruption,
 
726
        # but it at least helps verify the data you get, is what you want.
 
727
        from cStringIO import StringIO
 
728
 
 
729
        w = Weave()
 
730
        w.add_lines('v1', [], ['hello\n'])
 
731
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
732
 
 
733
        tmpf = StringIO()
 
734
        write_weave(w, tmpf)
 
735
 
 
736
        # Because we are corrupting, we need to make sure we have the exact text
 
737
        self.assertEquals('# bzr weave file v5\n'
 
738
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
739
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
740
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
741
                          tmpf.getvalue())
 
742
 
 
743
        # Change a single letter
 
744
        tmpf = StringIO('# bzr weave file v5\n'
 
745
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
746
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
747
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
748
 
 
749
        w = read_weave(tmpf)
 
750
 
 
751
        self.assertEqual('hello\n', w.get_text('v1'))
 
752
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
753
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
754
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
755
 
 
756
        # Change the sha checksum
 
757
        tmpf = StringIO('# bzr weave file v5\n'
 
758
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
759
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
760
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
761
 
 
762
        w = read_weave(tmpf)
 
763
 
 
764
        self.assertEqual('hello\n', w.get_text('v1'))
 
765
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
766
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
767
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
768
 
 
769
 
 
770
class InstrumentedWeave(Weave):
 
771
    """Keep track of how many times functions are called."""
 
772
    
 
773
    def __init__(self, weave_name=None):
 
774
        self._extract_count = 0
 
775
        Weave.__init__(self, weave_name=weave_name)
 
776
 
 
777
    def _extract(self, versions):
 
778
        self._extract_count += 1
 
779
        return Weave._extract(self, versions)
 
780
 
 
781
 
 
782
class JoinOptimization(TestCase):
 
783
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
 
784
 
 
785
    def test_join(self):
 
786
        w1 = InstrumentedWeave()
 
787
        w2 = InstrumentedWeave()
 
788
 
 
789
        txt0 = ['a\n']
 
790
        txt1 = ['a\n', 'b\n']
 
791
        txt2 = ['a\n', 'c\n']
 
792
        txt3 = ['a\n', 'b\n', 'c\n']
 
793
 
 
794
        w1.add_lines('txt0', [], txt0) # extract 1a
 
795
        w2.add_lines('txt0', [], txt0) # extract 1b
 
796
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
 
797
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
 
798
        w1.join(w2) # extract 3a to add txt2 
 
799
        w2.join(w1) # extract 3b to add txt1 
 
800
 
 
801
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
 
802
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
 
803
        # These secretly have inverted parents
 
804
 
 
805
        # This should not have to do any extractions
 
806
        w1.join(w2) # NO extract, texts already present with same parents
 
807
        w2.join(w1) # NO extract, texts already present with same parents
 
808
 
 
809
        self.assertEqual(4, w1._extract_count)
 
810
        self.assertEqual(4, w2._extract_count)
 
811
 
 
812
    def test_double_parent(self):
 
813
        # It should not be considered illegal to add
 
814
        # a revision with the same parent twice
 
815
        w1 = InstrumentedWeave()
 
816
        w2 = InstrumentedWeave()
 
817
 
 
818
        txt0 = ['a\n']
 
819
        txt1 = ['a\n', 'b\n']
 
820
        txt2 = ['a\n', 'c\n']
 
821
        txt3 = ['a\n', 'b\n', 'c\n']
 
822
 
 
823
        w1.add_lines('txt0', [], txt0)
 
824
        w2.add_lines('txt0', [], txt0)
 
825
        w1.add_lines('txt1', ['txt0'], txt1)
 
826
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
 
827
        # Same text, effectively the same, because the
 
828
        # parent is only repeated
 
829
        w1.join(w2) # extract 3a to add txt2 
 
830
        w2.join(w1) # extract 3b to add txt1 
 
831
 
 
832
 
 
833
class TestNeedsReweave(TestCase):
 
834
    """Internal corner cases for when reweave is needed."""
 
835
 
 
836
    def test_compatible_parents(self):
 
837
        w1 = Weave('a')
 
838
        my_parents = set([1, 2, 3])
 
839
        # subsets are ok
 
840
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
 
841
        # same sets
 
842
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
843
        # same empty corner case
 
844
        self.assertTrue(w1._compatible_parents(set(), set()))
 
845
        # other cannot contain stuff my_parents does not
 
846
        self.assertFalse(w1._compatible_parents(set(), set([1])))
 
847
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
 
848
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
 
849
 
 
850
 
 
851
class TestWeaveFile(TestCaseInTempDir):
 
852
    
 
853
    def test_empty_file(self):
 
854
        f = open('empty.weave', 'wb+')
 
855
        try:
 
856
            self.assertRaises(errors.WeaveFormatError,
 
857
                              read_weave, f)
 
858
        finally:
 
859
            f.close()