/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Keir Mierle
  • Date: 2007-09-04 19:03:51 UTC
  • mto: This revision was merged to the branch mainline in revision 2824.
  • Revision ID: keir@cs.utoronto.ca-20070904190351-ug32zv9dcfqtluvi
Change ordering of clients listing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: tests regarding version names
 
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
 
20
#       if it fails.
 
21
 
 
22
"""test suite for weave algorithm"""
 
23
 
 
24
from pprint import pformat
 
25
 
 
26
from bzrlib import (
 
27
    errors,
 
28
    )
 
29
from bzrlib.osutils import sha_string
 
30
from bzrlib.tests import TestCase, TestCaseInTempDir
 
31
from bzrlib.weave import Weave, WeaveFormatError, WeaveError
 
32
from bzrlib.weavefile import write_weave, read_weave
 
33
 
 
34
 
 
35
# texts for use in testing
 
36
TEXT_0 = ["Hello world"]
 
37
TEXT_1 = ["Hello world",
 
38
          "A second line"]
 
39
 
 
40
 
 
41
class TestBase(TestCase):
 
42
 
 
43
    def check_read_write(self, k):
 
44
        """Check the weave k can be written & re-read."""
 
45
        from tempfile import TemporaryFile
 
46
        tf = TemporaryFile()
 
47
 
 
48
        write_weave(k, tf)
 
49
        tf.seek(0)
 
50
        k2 = read_weave(tf)
 
51
 
 
52
        if k != k2:
 
53
            tf.seek(0)
 
54
            self.log('serialized weave:')
 
55
            self.log(tf.read())
 
56
 
 
57
            self.log('')
 
58
            self.log('parents: %s' % (k._parents == k2._parents))
 
59
            self.log('         %r' % k._parents)
 
60
            self.log('         %r' % k2._parents)
 
61
            self.log('')
 
62
            self.fail('read/write check failed')
 
63
 
 
64
 
 
65
class WeaveContains(TestBase):
 
66
    """Weave __contains__ operator"""
 
67
    def runTest(self):
 
68
        k = Weave()
 
69
        self.assertFalse('foo' in k)
 
70
        k.add_lines('foo', [], TEXT_1)
 
71
        self.assertTrue('foo' in k)
 
72
 
 
73
 
 
74
class Easy(TestBase):
 
75
    def runTest(self):
 
76
        k = Weave()
 
77
 
 
78
 
 
79
class AnnotateOne(TestBase):
 
80
    def runTest(self):
 
81
        k = Weave()
 
82
        k.add_lines('text0', [], TEXT_0)
 
83
        self.assertEqual(k.annotate('text0'),
 
84
                         [('text0', TEXT_0[0])])
 
85
 
 
86
 
 
87
class GetSha1(TestBase):
 
88
    def test_get_sha1(self):
 
89
        k = Weave()
 
90
        k.add_lines('text0', [], 'text0')
 
91
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
 
92
                         k.get_sha1('text0'))
 
93
        self.assertRaises(errors.RevisionNotPresent,
 
94
                          k.get_sha1, 0)
 
95
        self.assertRaises(errors.RevisionNotPresent,
 
96
                          k.get_sha1, 'text1')
 
97
                        
 
98
 
 
99
class InvalidAdd(TestBase):
 
100
    """Try to use invalid version number during add."""
 
101
    def runTest(self):
 
102
        k = Weave()
 
103
 
 
104
        self.assertRaises(errors.RevisionNotPresent,
 
105
                          k.add_lines,
 
106
                          'text0',
 
107
                          ['69'],
 
108
                          ['new text!'])
 
109
 
 
110
 
 
111
class RepeatedAdd(TestBase):
 
112
    """Add the same version twice; harmless."""
 
113
 
 
114
    def test_duplicate_add(self):
 
115
        k = Weave()
 
116
        idx = k.add_lines('text0', [], TEXT_0)
 
117
        idx2 = k.add_lines('text0', [], TEXT_0)
 
118
        self.assertEqual(idx, idx2)
 
119
 
 
120
 
 
121
class InvalidRepeatedAdd(TestBase):
 
122
    def runTest(self):
 
123
        k = Weave()
 
124
        k.add_lines('basis', [], TEXT_0)
 
125
        idx = k.add_lines('text0', [], TEXT_0)
 
126
        self.assertRaises(errors.RevisionAlreadyPresent,
 
127
                          k.add_lines,
 
128
                          'text0',
 
129
                          [],
 
130
                          ['not the same text'])
 
131
        self.assertRaises(errors.RevisionAlreadyPresent,
 
132
                          k.add_lines,
 
133
                          'text0',
 
134
                          ['basis'],         # not the right parents
 
135
                          TEXT_0)
 
136
        
 
137
 
 
138
class InsertLines(TestBase):
 
139
    """Store a revision that adds one line to the original.
 
140
 
 
141
    Look at the annotations to make sure that the first line is matched
 
142
    and not stored repeatedly."""
 
143
    def runTest(self):
 
144
        k = Weave()
 
145
 
 
146
        k.add_lines('text0', [], ['line 1'])
 
147
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
 
148
 
 
149
        self.assertEqual(k.annotate('text0'),
 
150
                         [('text0', 'line 1')])
 
151
 
 
152
        self.assertEqual(k.get_lines(1),
 
153
                         ['line 1',
 
154
                          'line 2'])
 
155
 
 
156
        self.assertEqual(k.annotate('text1'),
 
157
                         [('text0', 'line 1'),
 
158
                          ('text1', 'line 2')])
 
159
 
 
160
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
 
161
 
 
162
        self.assertEqual(k.annotate('text2'),
 
163
                         [('text0', 'line 1'),
 
164
                          ('text2', 'diverged line')])
 
165
 
 
166
        text3 = ['line 1', 'middle line', 'line 2']
 
167
        k.add_lines('text3',
 
168
              ['text0', 'text1'],
 
169
              text3)
 
170
 
 
171
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
 
172
 
 
173
        self.log("k._weave=" + pformat(k._weave))
 
174
 
 
175
        self.assertEqual(k.annotate('text3'),
 
176
                         [('text0', 'line 1'),
 
177
                          ('text3', 'middle line'),
 
178
                          ('text1', 'line 2')])
 
179
 
 
180
        # now multiple insertions at different places
 
181
        k.add_lines('text4',
 
182
              ['text0', 'text1', 'text3'],
 
183
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
 
184
 
 
185
        self.assertEqual(k.annotate('text4'), 
 
186
                         [('text0', 'line 1'),
 
187
                          ('text4', 'aaa'),
 
188
                          ('text3', 'middle line'),
 
189
                          ('text4', 'bbb'),
 
190
                          ('text1', 'line 2'),
 
191
                          ('text4', 'ccc')])
 
192
 
 
193
 
 
194
class DeleteLines(TestBase):
 
195
    """Deletion of lines from existing text.
 
196
 
 
197
    Try various texts all based on a common ancestor."""
 
198
    def runTest(self):
 
199
        k = Weave()
 
200
 
 
201
        base_text = ['one', 'two', 'three', 'four']
 
202
 
 
203
        k.add_lines('text0', [], base_text)
 
204
        
 
205
        texts = [['one', 'two', 'three'],
 
206
                 ['two', 'three', 'four'],
 
207
                 ['one', 'four'],
 
208
                 ['one', 'two', 'three', 'four'],
 
209
                 ]
 
210
 
 
211
        i = 1
 
212
        for t in texts:
 
213
            ver = k.add_lines('text%d' % i,
 
214
                        ['text0'], t)
 
215
            i += 1
 
216
 
 
217
        self.log('final weave:')
 
218
        self.log('k._weave=' + pformat(k._weave))
 
219
 
 
220
        for i in range(len(texts)):
 
221
            self.assertEqual(k.get_lines(i+1),
 
222
                             texts[i])
 
223
 
 
224
 
 
225
class SuicideDelete(TestBase):
 
226
    """Invalid weave which tries to add and delete simultaneously."""
 
227
    def runTest(self):
 
228
        k = Weave()
 
229
 
 
230
        k._parents = [(),
 
231
                ]
 
232
        k._weave = [('{', 0),
 
233
                'first line',
 
234
                ('[', 0),
 
235
                'deleted in 0',
 
236
                (']', 0),
 
237
                ('}', 0),
 
238
                ]
 
239
        ################################### SKIPPED
 
240
        # Weave.get doesn't trap this anymore
 
241
        return 
 
242
 
 
243
        self.assertRaises(WeaveFormatError,
 
244
                          k.get_lines,
 
245
                          0)        
 
246
 
 
247
 
 
248
class CannedDelete(TestBase):
 
249
    """Unpack canned weave with deleted lines."""
 
250
    def runTest(self):
 
251
        k = Weave()
 
252
 
 
253
        k._parents = [(),
 
254
                frozenset([0]),
 
255
                ]
 
256
        k._weave = [('{', 0),
 
257
                'first line',
 
258
                ('[', 1),
 
259
                'line to be deleted',
 
260
                (']', 1),
 
261
                'last line',
 
262
                ('}', 0),
 
263
                ]
 
264
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
265
                  , sha_string('first linelast line')]
 
266
 
 
267
        self.assertEqual(k.get_lines(0),
 
268
                         ['first line',
 
269
                          'line to be deleted',
 
270
                          'last line',
 
271
                          ])
 
272
 
 
273
        self.assertEqual(k.get_lines(1),
 
274
                         ['first line',
 
275
                          'last line',
 
276
                          ])
 
277
 
 
278
 
 
279
class CannedReplacement(TestBase):
 
280
    """Unpack canned weave with deleted lines."""
 
281
    def runTest(self):
 
282
        k = Weave()
 
283
 
 
284
        k._parents = [frozenset(),
 
285
                frozenset([0]),
 
286
                ]
 
287
        k._weave = [('{', 0),
 
288
                'first line',
 
289
                ('[', 1),
 
290
                'line to be deleted',
 
291
                (']', 1),
 
292
                ('{', 1),
 
293
                'replacement line',                
 
294
                ('}', 1),
 
295
                'last line',
 
296
                ('}', 0),
 
297
                ]
 
298
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
299
                  , sha_string('first linereplacement linelast line')]
 
300
 
 
301
        self.assertEqual(k.get_lines(0),
 
302
                         ['first line',
 
303
                          'line to be deleted',
 
304
                          'last line',
 
305
                          ])
 
306
 
 
307
        self.assertEqual(k.get_lines(1),
 
308
                         ['first line',
 
309
                          'replacement line',
 
310
                          'last line',
 
311
                          ])
 
312
 
 
313
 
 
314
class BadWeave(TestBase):
 
315
    """Test that we trap an insert which should not occur."""
 
316
    def runTest(self):
 
317
        k = Weave()
 
318
 
 
319
        k._parents = [frozenset(),
 
320
                ]
 
321
        k._weave = ['bad line',
 
322
                ('{', 0),
 
323
                'foo {',
 
324
                ('{', 1),
 
325
                '  added in version 1',
 
326
                ('{', 2),
 
327
                '  added in v2',
 
328
                ('}', 2),
 
329
                '  also from v1',
 
330
                ('}', 1),
 
331
                '}',
 
332
                ('}', 0)]
 
333
 
 
334
        ################################### SKIPPED
 
335
        # Weave.get doesn't trap this anymore
 
336
        return 
 
337
 
 
338
 
 
339
        self.assertRaises(WeaveFormatError,
 
340
                          k.get,
 
341
                          0)
 
342
 
 
343
 
 
344
class BadInsert(TestBase):
 
345
    """Test that we trap an insert which should not occur."""
 
346
    def runTest(self):
 
347
        k = Weave()
 
348
 
 
349
        k._parents = [frozenset(),
 
350
                frozenset([0]),
 
351
                frozenset([0]),
 
352
                frozenset([0,1,2]),
 
353
                ]
 
354
        k._weave = [('{', 0),
 
355
                'foo {',
 
356
                ('{', 1),
 
357
                '  added in version 1',
 
358
                ('{', 1),
 
359
                '  more in 1',
 
360
                ('}', 1),
 
361
                ('}', 1),
 
362
                ('}', 0)]
 
363
 
 
364
 
 
365
        # this is not currently enforced by get
 
366
        return  ##########################################
 
367
 
 
368
        self.assertRaises(WeaveFormatError,
 
369
                          k.get,
 
370
                          0)
 
371
 
 
372
        self.assertRaises(WeaveFormatError,
 
373
                          k.get,
 
374
                          1)
 
375
 
 
376
 
 
377
class InsertNested(TestBase):
 
378
    """Insertion with nested instructions."""
 
379
    def runTest(self):
 
380
        k = Weave()
 
381
 
 
382
        k._parents = [frozenset(),
 
383
                frozenset([0]),
 
384
                frozenset([0]),
 
385
                frozenset([0,1,2]),
 
386
                ]
 
387
        k._weave = [('{', 0),
 
388
                'foo {',
 
389
                ('{', 1),
 
390
                '  added in version 1',
 
391
                ('{', 2),
 
392
                '  added in v2',
 
393
                ('}', 2),
 
394
                '  also from v1',
 
395
                ('}', 1),
 
396
                '}',
 
397
                ('}', 0)]
 
398
 
 
399
        k._sha1s = [sha_string('foo {}')
 
400
                  , sha_string('foo {  added in version 1  also from v1}')
 
401
                  , sha_string('foo {  added in v2}')
 
402
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
403
                  ]
 
404
 
 
405
        self.assertEqual(k.get_lines(0),
 
406
                         ['foo {',
 
407
                          '}'])
 
408
 
 
409
        self.assertEqual(k.get_lines(1),
 
410
                         ['foo {',
 
411
                          '  added in version 1',
 
412
                          '  also from v1',
 
413
                          '}'])
 
414
                       
 
415
        self.assertEqual(k.get_lines(2),
 
416
                         ['foo {',
 
417
                          '  added in v2',
 
418
                          '}'])
 
419
 
 
420
        self.assertEqual(k.get_lines(3),
 
421
                         ['foo {',
 
422
                          '  added in version 1',
 
423
                          '  added in v2',
 
424
                          '  also from v1',
 
425
                          '}'])
 
426
                         
 
427
 
 
428
class DeleteLines2(TestBase):
 
429
    """Test recording revisions that delete lines.
 
430
 
 
431
    This relies on the weave having a way to represent lines knocked
 
432
    out by a later revision."""
 
433
    def runTest(self):
 
434
        k = Weave()
 
435
 
 
436
        k.add_lines('text0', [], ["line the first",
 
437
                   "line 2",
 
438
                   "line 3",
 
439
                   "fine"])
 
440
 
 
441
        self.assertEqual(len(k.get_lines(0)), 4)
 
442
 
 
443
        k.add_lines('text1', ['text0'], ["line the first",
 
444
                   "fine"])
 
445
 
 
446
        self.assertEqual(k.get_lines(1),
 
447
                         ["line the first",
 
448
                          "fine"])
 
449
 
 
450
        self.assertEqual(k.annotate('text1'),
 
451
                         [('text0', "line the first"),
 
452
                          ('text0', "fine")])
 
453
 
 
454
 
 
455
class IncludeVersions(TestBase):
 
456
    """Check texts that are stored across multiple revisions.
 
457
 
 
458
    Here we manually create a weave with particular encoding and make
 
459
    sure it unpacks properly.
 
460
 
 
461
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
462
    lines.
 
463
    """
 
464
 
 
465
    def runTest(self):
 
466
        k = Weave()
 
467
 
 
468
        k._parents = [frozenset(), frozenset([0])]
 
469
        k._weave = [('{', 0),
 
470
                "first line",
 
471
                ('}', 0),
 
472
                ('{', 1),
 
473
                "second line",
 
474
                ('}', 1)]
 
475
 
 
476
        k._sha1s = [sha_string('first line')
 
477
                  , sha_string('first linesecond line')]
 
478
 
 
479
        self.assertEqual(k.get_lines(1),
 
480
                         ["first line",
 
481
                          "second line"])
 
482
 
 
483
        self.assertEqual(k.get_lines(0),
 
484
                         ["first line"])
 
485
 
 
486
 
 
487
class DivergedIncludes(TestBase):
 
488
    """Weave with two diverged texts based on version 0.
 
489
    """
 
490
    def runTest(self):
 
491
        # FIXME make the weave, dont poke at it.
 
492
        k = Weave()
 
493
 
 
494
        k._names = ['0', '1', '2']
 
495
        k._name_map = {'0':0, '1':1, '2':2}
 
496
        k._parents = [frozenset(),
 
497
                frozenset([0]),
 
498
                frozenset([0]),
 
499
                ]
 
500
        k._weave = [('{', 0),
 
501
                "first line",
 
502
                ('}', 0),
 
503
                ('{', 1),
 
504
                "second line",
 
505
                ('}', 1),
 
506
                ('{', 2),
 
507
                "alternative second line",
 
508
                ('}', 2),                
 
509
                ]
 
510
 
 
511
        k._sha1s = [sha_string('first line')
 
512
                  , sha_string('first linesecond line')
 
513
                  , sha_string('first linealternative second line')]
 
514
 
 
515
        self.assertEqual(k.get_lines(0),
 
516
                         ["first line"])
 
517
 
 
518
        self.assertEqual(k.get_lines(1),
 
519
                         ["first line",
 
520
                          "second line"])
 
521
 
 
522
        self.assertEqual(k.get_lines('2'),
 
523
                         ["first line",
 
524
                          "alternative second line"])
 
525
 
 
526
        self.assertEqual(list(k.get_ancestry(['2'])),
 
527
                         ['0', '2'])
 
528
 
 
529
 
 
530
class ReplaceLine(TestBase):
 
531
    def runTest(self):
 
532
        k = Weave()
 
533
 
 
534
        text0 = ['cheddar', 'stilton', 'gruyere']
 
535
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
 
536
        
 
537
        k.add_lines('text0', [], text0)
 
538
        k.add_lines('text1', ['text0'], text1)
 
539
 
 
540
        self.log('k._weave=' + pformat(k._weave))
 
541
 
 
542
        self.assertEqual(k.get_lines(0), text0)
 
543
        self.assertEqual(k.get_lines(1), text1)
 
544
 
 
545
 
 
546
class Merge(TestBase):
 
547
    """Storage of versions that merge diverged parents"""
 
548
    def runTest(self):
 
549
        k = Weave()
 
550
 
 
551
        texts = [['header'],
 
552
                 ['header', '', 'line from 1'],
 
553
                 ['header', '', 'line from 2', 'more from 2'],
 
554
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
 
555
                 ]
 
556
 
 
557
        k.add_lines('text0', [], texts[0])
 
558
        k.add_lines('text1', ['text0'], texts[1])
 
559
        k.add_lines('text2', ['text0'], texts[2])
 
560
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
 
561
 
 
562
        for i, t in enumerate(texts):
 
563
            self.assertEqual(k.get_lines(i), t)
 
564
 
 
565
        self.assertEqual(k.annotate('merge'),
 
566
                         [('text0', 'header'),
 
567
                          ('text1', ''),
 
568
                          ('text1', 'line from 1'),
 
569
                          ('merge', 'fixup line'),
 
570
                          ('text2', 'line from 2'),
 
571
                          ])
 
572
 
 
573
        self.assertEqual(list(k.get_ancestry(['merge'])),
 
574
                         ['text0', 'text1', 'text2', 'merge'])
 
575
 
 
576
        self.log('k._weave=' + pformat(k._weave))
 
577
 
 
578
        self.check_read_write(k)
 
579
 
 
580
 
 
581
class Conflicts(TestBase):
 
582
    """Test detection of conflicting regions during a merge.
 
583
 
 
584
    A base version is inserted, then two descendents try to
 
585
    insert different lines in the same place.  These should be
 
586
    reported as a possible conflict and forwarded to the user."""
 
587
    def runTest(self):
 
588
        return  # NOT RUN
 
589
        k = Weave()
 
590
 
 
591
        k.add_lines([], ['aaa', 'bbb'])
 
592
        k.add_lines([0], ['aaa', '111', 'bbb'])
 
593
        k.add_lines([1], ['aaa', '222', 'bbb'])
 
594
 
 
595
        merged = k.merge([1, 2])
 
596
 
 
597
        self.assertEquals([[['aaa']],
 
598
                           [['111'], ['222']],
 
599
                           [['bbb']]])
 
600
 
 
601
 
 
602
class NonConflict(TestBase):
 
603
    """Two descendants insert compatible changes.
 
604
 
 
605
    No conflict should be reported."""
 
606
    def runTest(self):
 
607
        return  # NOT RUN
 
608
        k = Weave()
 
609
 
 
610
        k.add_lines([], ['aaa', 'bbb'])
 
611
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
 
612
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
 
613
 
 
614
 
 
615
class Khayyam(TestBase):
 
616
    """Test changes to multi-line texts, and read/write"""
 
617
 
 
618
    def test_multi_line_merge(self):
 
619
        rawtexts = [
 
620
            """A Book of Verses underneath the Bough,
 
621
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
622
            Beside me singing in the Wilderness --
 
623
            Oh, Wilderness were Paradise enow!""",
 
624
            
 
625
            """A Book of Verses underneath the Bough,
 
626
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
627
            Beside me singing in the Wilderness --
 
628
            Oh, Wilderness were Paradise now!""",
 
629
 
 
630
            """A Book of poems underneath the tree,
 
631
            A Jug of Wine, a Loaf of Bread,
 
632
            and Thou
 
633
            Beside me singing in the Wilderness --
 
634
            Oh, Wilderness were Paradise now!
 
635
 
 
636
            -- O. Khayyam""",
 
637
 
 
638
            """A Book of Verses underneath the Bough,
 
639
            A Jug of Wine, a Loaf of Bread,
 
640
            and Thou
 
641
            Beside me singing in the Wilderness --
 
642
            Oh, Wilderness were Paradise now!""",
 
643
            ]
 
644
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
 
645
 
 
646
        k = Weave()
 
647
        parents = set()
 
648
        i = 0
 
649
        for t in texts:
 
650
            ver = k.add_lines('text%d' % i,
 
651
                        list(parents), t)
 
652
            parents.add('text%d' % i)
 
653
            i += 1
 
654
 
 
655
        self.log("k._weave=" + pformat(k._weave))
 
656
 
 
657
        for i, t in enumerate(texts):
 
658
            self.assertEqual(k.get_lines(i), t)
 
659
 
 
660
        self.check_read_write(k)
 
661
 
 
662
 
 
663
class JoinWeavesTests(TestBase):
 
664
    def setUp(self):
 
665
        super(JoinWeavesTests, self).setUp()
 
666
        self.weave1 = Weave()
 
667
        self.lines1 = ['hello\n']
 
668
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
 
669
        self.weave1.add_lines('v1', [], self.lines1)
 
670
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
671
        self.weave1.add_lines('v3', ['v2'], self.lines3)
 
672
        
 
673
    def test_join_empty(self):
 
674
        """Join two empty weaves."""
 
675
        eq = self.assertEqual
 
676
        w1 = Weave()
 
677
        w2 = Weave()
 
678
        w1.join(w2)
 
679
        eq(len(w1), 0)
 
680
        
 
681
    def test_join_empty_to_nonempty(self):
 
682
        """Join empty weave onto nonempty."""
 
683
        self.weave1.join(Weave())
 
684
        self.assertEqual(len(self.weave1), 3)
 
685
 
 
686
    def test_join_unrelated(self):
 
687
        """Join two weaves with no history in common."""
 
688
        wb = Weave()
 
689
        wb.add_lines('b1', [], ['line from b\n'])
 
690
        w1 = self.weave1
 
691
        w1.join(wb)
 
692
        eq = self.assertEqual
 
693
        eq(len(w1), 4)
 
694
        eq(sorted(w1.versions()),
 
695
           ['b1', 'v1', 'v2', 'v3'])
 
696
 
 
697
    def test_join_related(self):
 
698
        wa = self.weave1.copy()
 
699
        wb = self.weave1.copy()
 
700
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
 
701
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
 
702
        eq = self.assertEquals
 
703
        eq(len(wa), 4)
 
704
        eq(len(wb), 4)
 
705
        wa.join(wb)
 
706
        eq(len(wa), 5)
 
707
        eq(wa.get_lines('b1'),
 
708
           ['hello\n', 'pale blue\n', 'world\n'])
 
709
 
 
710
    def test_join_parent_disagreement(self):
 
711
        #join reconciles differening parents into a union.
 
712
        wa = Weave()
 
713
        wb = Weave()
 
714
        wa.add_lines('v1', [], ['hello\n'])
 
715
        wb.add_lines('v0', [], [])
 
716
        wb.add_lines('v1', ['v0'], ['hello\n'])
 
717
        wa.join(wb)
 
718
        self.assertEqual(['v0'], wa.get_parents('v1'))
 
719
 
 
720
    def test_join_text_disagreement(self):
 
721
        """Cannot join weaves with different texts for a version."""
 
722
        wa = Weave()
 
723
        wb = Weave()
 
724
        wa.add_lines('v1', [], ['hello\n'])
 
725
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
 
726
        self.assertRaises(WeaveError,
 
727
                          wa.join, wb)
 
728
 
 
729
    def test_join_unordered(self):
 
730
        """Join weaves where indexes differ.
 
731
        
 
732
        The source weave contains a different version at index 0."""
 
733
        wa = self.weave1.copy()
 
734
        wb = Weave()
 
735
        wb.add_lines('x1', [], ['line from x1\n'])
 
736
        wb.add_lines('v1', [], ['hello\n'])
 
737
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
738
        wa.join(wb)
 
739
        eq = self.assertEquals
 
740
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
 
741
        eq(wa.get_text('x1'), 'line from x1\n')
 
742
 
 
743
    def test_written_detection(self):
 
744
        # Test detection of weave file corruption.
 
745
        #
 
746
        # Make sure that we can detect if a weave file has
 
747
        # been corrupted. This doesn't test all forms of corruption,
 
748
        # but it at least helps verify the data you get, is what you want.
 
749
        from cStringIO import StringIO
 
750
 
 
751
        w = Weave()
 
752
        w.add_lines('v1', [], ['hello\n'])
 
753
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
754
 
 
755
        tmpf = StringIO()
 
756
        write_weave(w, tmpf)
 
757
 
 
758
        # Because we are corrupting, we need to make sure we have the exact text
 
759
        self.assertEquals('# bzr weave file v5\n'
 
760
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
761
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
762
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
763
                          tmpf.getvalue())
 
764
 
 
765
        # Change a single letter
 
766
        tmpf = StringIO('# bzr weave file v5\n'
 
767
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
768
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
769
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
770
 
 
771
        w = read_weave(tmpf)
 
772
 
 
773
        self.assertEqual('hello\n', w.get_text('v1'))
 
774
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
775
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
776
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
777
 
 
778
        # Change the sha checksum
 
779
        tmpf = StringIO('# bzr weave file v5\n'
 
780
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
781
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
782
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
783
 
 
784
        w = read_weave(tmpf)
 
785
 
 
786
        self.assertEqual('hello\n', w.get_text('v1'))
 
787
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
788
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
789
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
790
 
 
791
 
 
792
class InstrumentedWeave(Weave):
 
793
    """Keep track of how many times functions are called."""
 
794
    
 
795
    def __init__(self, weave_name=None):
 
796
        self._extract_count = 0
 
797
        Weave.__init__(self, weave_name=weave_name)
 
798
 
 
799
    def _extract(self, versions):
 
800
        self._extract_count += 1
 
801
        return Weave._extract(self, versions)
 
802
 
 
803
 
 
804
class JoinOptimization(TestCase):
 
805
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
 
806
 
 
807
    def test_join(self):
 
808
        w1 = InstrumentedWeave()
 
809
        w2 = InstrumentedWeave()
 
810
 
 
811
        txt0 = ['a\n']
 
812
        txt1 = ['a\n', 'b\n']
 
813
        txt2 = ['a\n', 'c\n']
 
814
        txt3 = ['a\n', 'b\n', 'c\n']
 
815
 
 
816
        w1.add_lines('txt0', [], txt0) # extract 1a
 
817
        w2.add_lines('txt0', [], txt0) # extract 1b
 
818
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
 
819
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
 
820
        w1.join(w2) # extract 3a to add txt2 
 
821
        w2.join(w1) # extract 3b to add txt1 
 
822
 
 
823
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
 
824
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
 
825
        # These secretly have inverted parents
 
826
 
 
827
        # This should not have to do any extractions
 
828
        w1.join(w2) # NO extract, texts already present with same parents
 
829
        w2.join(w1) # NO extract, texts already present with same parents
 
830
 
 
831
        self.assertEqual(4, w1._extract_count)
 
832
        self.assertEqual(4, w2._extract_count)
 
833
 
 
834
    def test_double_parent(self):
 
835
        # It should not be considered illegal to add
 
836
        # a revision with the same parent twice
 
837
        w1 = InstrumentedWeave()
 
838
        w2 = InstrumentedWeave()
 
839
 
 
840
        txt0 = ['a\n']
 
841
        txt1 = ['a\n', 'b\n']
 
842
        txt2 = ['a\n', 'c\n']
 
843
        txt3 = ['a\n', 'b\n', 'c\n']
 
844
 
 
845
        w1.add_lines('txt0', [], txt0)
 
846
        w2.add_lines('txt0', [], txt0)
 
847
        w1.add_lines('txt1', ['txt0'], txt1)
 
848
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
 
849
        # Same text, effectively the same, because the
 
850
        # parent is only repeated
 
851
        w1.join(w2) # extract 3a to add txt2 
 
852
        w2.join(w1) # extract 3b to add txt1 
 
853
 
 
854
 
 
855
class TestNeedsReweave(TestCase):
 
856
    """Internal corner cases for when reweave is needed."""
 
857
 
 
858
    def test_compatible_parents(self):
 
859
        w1 = Weave('a')
 
860
        my_parents = set([1, 2, 3])
 
861
        # subsets are ok
 
862
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
 
863
        # same sets
 
864
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
865
        # same empty corner case
 
866
        self.assertTrue(w1._compatible_parents(set(), set()))
 
867
        # other cannot contain stuff my_parents does not
 
868
        self.assertFalse(w1._compatible_parents(set(), set([1])))
 
869
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
 
870
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
 
871
 
 
872
 
 
873
class TestWeaveFile(TestCaseInTempDir):
 
874
    
 
875
    def test_empty_file(self):
 
876
        f = open('empty.weave', 'wb+')
 
877
        try:
 
878
            self.assertRaises(errors.WeaveFormatError,
 
879
                              read_weave, f)
 
880
        finally:
 
881
            f.close()