/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_weave.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-19 23:18:42 UTC
  • mto: (7490.3.4 work)
  • mto: This revision was merged to the branch mainline in revision 7495.
  • Revision ID: jelmer@jelmer.uk-20200219231842-agwjh2db66cpajqg
Consistent return values.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: tests regarding version names
 
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
 
20
#       if it fails.
 
21
 
 
22
"""test suite for weave algorithm"""
 
23
 
 
24
from pprint import pformat
 
25
 
 
26
from .. import (
 
27
    errors,
 
28
    )
 
29
from ..osutils import sha_string
 
30
from ..sixish import (
 
31
    BytesIO,
 
32
    )
 
33
from . import TestCase, TestCaseInTempDir
 
34
from ..bzr.weave import Weave, WeaveFormatError, WeaveInvalidChecksum
 
35
from ..bzr.weavefile import write_weave, read_weave
 
36
 
 
37
 
 
38
# texts for use in testing
 
39
TEXT_0 = [b"Hello world"]
 
40
TEXT_1 = [b"Hello world",
 
41
          b"A second line"]
 
42
 
 
43
 
 
44
class TestBase(TestCase):
 
45
 
 
46
    def check_read_write(self, k):
 
47
        """Check the weave k can be written & re-read."""
 
48
        from tempfile import TemporaryFile
 
49
        tf = TemporaryFile()
 
50
 
 
51
        write_weave(k, tf)
 
52
        tf.seek(0)
 
53
        k2 = read_weave(tf)
 
54
 
 
55
        if k != k2:
 
56
            tf.seek(0)
 
57
            self.log('serialized weave:')
 
58
            self.log(tf.read())
 
59
 
 
60
            self.log('')
 
61
            self.log('parents: %s' % (k._parents == k2._parents))
 
62
            self.log('         %r' % k._parents)
 
63
            self.log('         %r' % k2._parents)
 
64
            self.log('')
 
65
            self.fail('read/write check failed')
 
66
 
 
67
 
 
68
class WeaveContains(TestBase):
 
69
    """Weave __contains__ operator"""
 
70
 
 
71
    def runTest(self):
 
72
        k = Weave(get_scope=lambda: None)
 
73
        self.assertFalse(b'foo' in k)
 
74
        k.add_lines(b'foo', [], TEXT_1)
 
75
        self.assertTrue(b'foo' in k)
 
76
 
 
77
 
 
78
class Easy(TestBase):
 
79
 
 
80
    def runTest(self):
 
81
        Weave()
 
82
 
 
83
 
 
84
class AnnotateOne(TestBase):
 
85
 
 
86
    def runTest(self):
 
87
        k = Weave()
 
88
        k.add_lines(b'text0', [], TEXT_0)
 
89
        self.assertEqual(k.annotate(b'text0'),
 
90
                         [(b'text0', TEXT_0[0])])
 
91
 
 
92
 
 
93
class InvalidAdd(TestBase):
 
94
    """Try to use invalid version number during add."""
 
95
 
 
96
    def runTest(self):
 
97
        k = Weave()
 
98
 
 
99
        self.assertRaises(errors.RevisionNotPresent,
 
100
                          k.add_lines,
 
101
                          b'text0',
 
102
                          [b'69'],
 
103
                          [b'new text!'])
 
104
 
 
105
 
 
106
class RepeatedAdd(TestBase):
 
107
    """Add the same version twice; harmless."""
 
108
 
 
109
    def test_duplicate_add(self):
 
110
        k = Weave()
 
111
        idx = k.add_lines(b'text0', [], TEXT_0)
 
112
        idx2 = k.add_lines(b'text0', [], TEXT_0)
 
113
        self.assertEqual(idx, idx2)
 
114
 
 
115
 
 
116
class InvalidRepeatedAdd(TestBase):
 
117
 
 
118
    def runTest(self):
 
119
        k = Weave()
 
120
        k.add_lines(b'basis', [], TEXT_0)
 
121
        k.add_lines(b'text0', [], TEXT_0)
 
122
        self.assertRaises(errors.RevisionAlreadyPresent,
 
123
                          k.add_lines,
 
124
                          b'text0',
 
125
                          [],
 
126
                          [b'not the same text'])
 
127
        self.assertRaises(errors.RevisionAlreadyPresent,
 
128
                          k.add_lines,
 
129
                          b'text0',
 
130
                          [b'basis'],         # not the right parents
 
131
                          TEXT_0)
 
132
 
 
133
 
 
134
class InsertLines(TestBase):
 
135
    """Store a revision that adds one line to the original.
 
136
 
 
137
    Look at the annotations to make sure that the first line is matched
 
138
    and not stored repeatedly."""
 
139
 
 
140
    def runTest(self):
 
141
        k = Weave()
 
142
 
 
143
        k.add_lines(b'text0', [], [b'line 1'])
 
144
        k.add_lines(b'text1', [b'text0'], [b'line 1', b'line 2'])
 
145
 
 
146
        self.assertEqual(k.annotate(b'text0'),
 
147
                         [(b'text0', b'line 1')])
 
148
 
 
149
        self.assertEqual(k.get_lines(1),
 
150
                         [b'line 1',
 
151
                          b'line 2'])
 
152
 
 
153
        self.assertEqual(k.annotate(b'text1'),
 
154
                         [(b'text0', b'line 1'),
 
155
                          (b'text1', b'line 2')])
 
156
 
 
157
        k.add_lines(b'text2', [b'text0'], [b'line 1', b'diverged line'])
 
158
 
 
159
        self.assertEqual(k.annotate(b'text2'),
 
160
                         [(b'text0', b'line 1'),
 
161
                          (b'text2', b'diverged line')])
 
162
 
 
163
        text3 = [b'line 1', b'middle line', b'line 2']
 
164
        k.add_lines(b'text3',
 
165
                    [b'text0', b'text1'],
 
166
                    text3)
 
167
 
 
168
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]),
 
169
        # text3))))
 
170
 
 
171
        self.log("k._weave=" + pformat(k._weave))
 
172
 
 
173
        self.assertEqual(k.annotate(b'text3'),
 
174
                         [(b'text0', b'line 1'),
 
175
                          (b'text3', b'middle line'),
 
176
                          (b'text1', b'line 2')])
 
177
 
 
178
        # now multiple insertions at different places
 
179
        k.add_lines(
 
180
            b'text4', [b'text0', b'text1', b'text3'],
 
181
            [b'line 1', b'aaa', b'middle line', b'bbb', b'line 2', b'ccc'])
 
182
 
 
183
        self.assertEqual(k.annotate(b'text4'),
 
184
                         [(b'text0', b'line 1'),
 
185
                          (b'text4', b'aaa'),
 
186
                          (b'text3', b'middle line'),
 
187
                          (b'text4', b'bbb'),
 
188
                          (b'text1', b'line 2'),
 
189
                          (b'text4', b'ccc')])
 
190
 
 
191
 
 
192
class DeleteLines(TestBase):
 
193
    """Deletion of lines from existing text.
 
194
 
 
195
    Try various texts all based on a common ancestor."""
 
196
 
 
197
    def runTest(self):
 
198
        k = Weave()
 
199
 
 
200
        base_text = [b'one', b'two', b'three', b'four']
 
201
 
 
202
        k.add_lines(b'text0', [], base_text)
 
203
 
 
204
        texts = [[b'one', b'two', b'three'],
 
205
                 [b'two', b'three', b'four'],
 
206
                 [b'one', b'four'],
 
207
                 [b'one', b'two', b'three', b'four'],
 
208
                 ]
 
209
 
 
210
        i = 1
 
211
        for t in texts:
 
212
            k.add_lines(b'text%d' % i, [b'text0'], t)
 
213
            i += 1
 
214
 
 
215
        self.log('final weave:')
 
216
        self.log('k._weave=' + pformat(k._weave))
 
217
 
 
218
        for i in range(len(texts)):
 
219
            self.assertEqual(k.get_lines(i + 1),
 
220
                             texts[i])
 
221
 
 
222
 
 
223
class SuicideDelete(TestBase):
 
224
    """Invalid weave which tries to add and delete simultaneously."""
 
225
 
 
226
    def runTest(self):
 
227
        k = Weave()
 
228
 
 
229
        k._parents = [(),
 
230
                      ]
 
231
        k._weave = [(b'{', 0),
 
232
                    b'first line',
 
233
                    (b'[', 0),
 
234
                    b'deleted in 0',
 
235
                    (b']', 0),
 
236
                    (b'}', 0),
 
237
                    ]
 
238
        # SKIPPED
 
239
        # Weave.get doesn't trap this anymore
 
240
        return
 
241
 
 
242
        self.assertRaises(WeaveFormatError,
 
243
                          k.get_lines,
 
244
                          0)
 
245
 
 
246
 
 
247
class CannedDelete(TestBase):
 
248
    """Unpack canned weave with deleted lines."""
 
249
 
 
250
    def runTest(self):
 
251
        k = Weave()
 
252
 
 
253
        k._parents = [(),
 
254
                      frozenset([0]),
 
255
                      ]
 
256
        k._weave = [(b'{', 0),
 
257
                    b'first line',
 
258
                    (b'[', 1),
 
259
                    b'line to be deleted',
 
260
                    (b']', 1),
 
261
                    b'last line',
 
262
                    (b'}', 0),
 
263
                    ]
 
264
        k._sha1s = [
 
265
            sha_string(b'first lineline to be deletedlast line'),
 
266
            sha_string(b'first linelast line')]
 
267
 
 
268
        self.assertEqual(k.get_lines(0),
 
269
                         [b'first line',
 
270
                          b'line to be deleted',
 
271
                          b'last line',
 
272
                          ])
 
273
 
 
274
        self.assertEqual(k.get_lines(1),
 
275
                         [b'first line',
 
276
                          b'last line',
 
277
                          ])
 
278
 
 
279
 
 
280
class CannedReplacement(TestBase):
 
281
    """Unpack canned weave with deleted lines."""
 
282
 
 
283
    def runTest(self):
 
284
        k = Weave()
 
285
 
 
286
        k._parents = [frozenset(),
 
287
                      frozenset([0]),
 
288
                      ]
 
289
        k._weave = [(b'{', 0),
 
290
                    b'first line',
 
291
                    (b'[', 1),
 
292
                    b'line to be deleted',
 
293
                    (b']', 1),
 
294
                    (b'{', 1),
 
295
                    b'replacement line',
 
296
                    (b'}', 1),
 
297
                    b'last line',
 
298
                    (b'}', 0),
 
299
                    ]
 
300
        k._sha1s = [
 
301
            sha_string(b'first lineline to be deletedlast line'),
 
302
            sha_string(b'first linereplacement linelast line')]
 
303
 
 
304
        self.assertEqual(k.get_lines(0),
 
305
                         [b'first line',
 
306
                          b'line to be deleted',
 
307
                          b'last line',
 
308
                          ])
 
309
 
 
310
        self.assertEqual(k.get_lines(1),
 
311
                         [b'first line',
 
312
                          b'replacement line',
 
313
                          b'last line',
 
314
                          ])
 
315
 
 
316
 
 
317
class BadWeave(TestBase):
 
318
    """Test that we trap an insert which should not occur."""
 
319
 
 
320
    def runTest(self):
 
321
        k = Weave()
 
322
 
 
323
        k._parents = [frozenset(),
 
324
                      ]
 
325
        k._weave = [b'bad line',
 
326
                    (b'{', 0),
 
327
                    b'foo {',
 
328
                    (b'{', 1),
 
329
                    b'  added in version 1',
 
330
                    (b'{', 2),
 
331
                    b'  added in v2',
 
332
                    (b'}', 2),
 
333
                    b'  also from v1',
 
334
                    (b'}', 1),
 
335
                    b'}',
 
336
                    (b'}', 0)]
 
337
 
 
338
        # SKIPPED
 
339
        # Weave.get doesn't trap this anymore
 
340
        return
 
341
 
 
342
        self.assertRaises(WeaveFormatError,
 
343
                          k.get,
 
344
                          0)
 
345
 
 
346
 
 
347
class BadInsert(TestBase):
 
348
    """Test that we trap an insert which should not occur."""
 
349
 
 
350
    def runTest(self):
 
351
        k = Weave()
 
352
 
 
353
        k._parents = [frozenset(),
 
354
                      frozenset([0]),
 
355
                      frozenset([0]),
 
356
                      frozenset([0, 1, 2]),
 
357
                      ]
 
358
        k._weave = [(b'{', 0),
 
359
                    b'foo {',
 
360
                    (b'{', 1),
 
361
                    b'  added in version 1',
 
362
                    (b'{', 1),
 
363
                    b'  more in 1',
 
364
                    (b'}', 1),
 
365
                    (b'}', 1),
 
366
                    (b'}', 0)]
 
367
 
 
368
        # this is not currently enforced by get
 
369
        return
 
370
 
 
371
        self.assertRaises(WeaveFormatError,
 
372
                          k.get,
 
373
                          0)
 
374
 
 
375
        self.assertRaises(WeaveFormatError,
 
376
                          k.get,
 
377
                          1)
 
378
 
 
379
 
 
380
class InsertNested(TestBase):
 
381
    """Insertion with nested instructions."""
 
382
 
 
383
    def runTest(self):
 
384
        k = Weave()
 
385
 
 
386
        k._parents = [frozenset(),
 
387
                      frozenset([0]),
 
388
                      frozenset([0]),
 
389
                      frozenset([0, 1, 2]),
 
390
                      ]
 
391
        k._weave = [(b'{', 0),
 
392
                    b'foo {',
 
393
                    (b'{', 1),
 
394
                    b'  added in version 1',
 
395
                    (b'{', 2),
 
396
                    b'  added in v2',
 
397
                    (b'}', 2),
 
398
                    b'  also from v1',
 
399
                    (b'}', 1),
 
400
                    b'}',
 
401
                    (b'}', 0)]
 
402
 
 
403
        k._sha1s = [
 
404
            sha_string(b'foo {}'),
 
405
            sha_string(b'foo {  added in version 1  also from v1}'),
 
406
            sha_string(b'foo {  added in v2}'),
 
407
            sha_string(
 
408
                b'foo {  added in version 1  added in v2  also from v1}')
 
409
            ]
 
410
 
 
411
        self.assertEqual(k.get_lines(0),
 
412
                         [b'foo {',
 
413
                          b'}'])
 
414
 
 
415
        self.assertEqual(k.get_lines(1),
 
416
                         [b'foo {',
 
417
                          b'  added in version 1',
 
418
                          b'  also from v1',
 
419
                          b'}'])
 
420
 
 
421
        self.assertEqual(k.get_lines(2),
 
422
                         [b'foo {',
 
423
                          b'  added in v2',
 
424
                          b'}'])
 
425
 
 
426
        self.assertEqual(k.get_lines(3),
 
427
                         [b'foo {',
 
428
                          b'  added in version 1',
 
429
                          b'  added in v2',
 
430
                          b'  also from v1',
 
431
                          b'}'])
 
432
 
 
433
 
 
434
class DeleteLines2(TestBase):
 
435
    """Test recording revisions that delete lines.
 
436
 
 
437
    This relies on the weave having a way to represent lines knocked
 
438
    out by a later revision."""
 
439
 
 
440
    def runTest(self):
 
441
        k = Weave()
 
442
 
 
443
        k.add_lines(b'text0', [], [b"line the first",
 
444
                                   b"line 2",
 
445
                                   b"line 3",
 
446
                                   b"fine"])
 
447
 
 
448
        self.assertEqual(len(k.get_lines(0)), 4)
 
449
 
 
450
        k.add_lines(b'text1', [b'text0'], [b"line the first",
 
451
                                           b"fine"])
 
452
 
 
453
        self.assertEqual(k.get_lines(1),
 
454
                         [b"line the first",
 
455
                          b"fine"])
 
456
 
 
457
        self.assertEqual(k.annotate(b'text1'),
 
458
                         [(b'text0', b"line the first"),
 
459
                          (b'text0', b"fine")])
 
460
 
 
461
 
 
462
class IncludeVersions(TestBase):
 
463
    """Check texts that are stored across multiple revisions.
 
464
 
 
465
    Here we manually create a weave with particular encoding and make
 
466
    sure it unpacks properly.
 
467
 
 
468
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
469
    lines.
 
470
    """
 
471
 
 
472
    def runTest(self):
 
473
        k = Weave()
 
474
 
 
475
        k._parents = [frozenset(), frozenset([0])]
 
476
        k._weave = [(b'{', 0),
 
477
                    b"first line",
 
478
                    (b'}', 0),
 
479
                    (b'{', 1),
 
480
                    b"second line",
 
481
                    (b'}', 1)]
 
482
 
 
483
        k._sha1s = [sha_string(b'first line'), sha_string(
 
484
            b'first linesecond line')]
 
485
 
 
486
        self.assertEqual(k.get_lines(1),
 
487
                         [b"first line",
 
488
                          b"second line"])
 
489
 
 
490
        self.assertEqual(k.get_lines(0),
 
491
                         [b"first line"])
 
492
 
 
493
 
 
494
class DivergedIncludes(TestBase):
 
495
    """Weave with two diverged texts based on version 0.
 
496
    """
 
497
 
 
498
    def runTest(self):
 
499
        # FIXME make the weave, dont poke at it.
 
500
        k = Weave()
 
501
 
 
502
        k._names = [b'0', b'1', b'2']
 
503
        k._name_map = {b'0': 0, b'1': 1, b'2': 2}
 
504
        k._parents = [frozenset(),
 
505
                      frozenset([0]),
 
506
                      frozenset([0]),
 
507
                      ]
 
508
        k._weave = [(b'{', 0),
 
509
                    b"first line",
 
510
                    (b'}', 0),
 
511
                    (b'{', 1),
 
512
                    b"second line",
 
513
                    (b'}', 1),
 
514
                    (b'{', 2),
 
515
                    b"alternative second line",
 
516
                    (b'}', 2),
 
517
                    ]
 
518
 
 
519
        k._sha1s = [
 
520
            sha_string(b'first line'),
 
521
            sha_string(b'first linesecond line'),
 
522
            sha_string(b'first linealternative second line')]
 
523
 
 
524
        self.assertEqual(k.get_lines(0),
 
525
                         [b"first line"])
 
526
 
 
527
        self.assertEqual(k.get_lines(1),
 
528
                         [b"first line",
 
529
                          b"second line"])
 
530
 
 
531
        self.assertEqual(k.get_lines(b'2'),
 
532
                         [b"first line",
 
533
                          b"alternative second line"])
 
534
 
 
535
        self.assertEqual(list(k.get_ancestry([b'2'])),
 
536
                         [b'0', b'2'])
 
537
 
 
538
 
 
539
class ReplaceLine(TestBase):
 
540
    def runTest(self):
 
541
        k = Weave()
 
542
 
 
543
        text0 = [b'cheddar', b'stilton', b'gruyere']
 
544
        text1 = [b'cheddar', b'blue vein', b'neufchatel', b'chevre']
 
545
 
 
546
        k.add_lines(b'text0', [], text0)
 
547
        k.add_lines(b'text1', [b'text0'], text1)
 
548
 
 
549
        self.log('k._weave=' + pformat(k._weave))
 
550
 
 
551
        self.assertEqual(k.get_lines(0), text0)
 
552
        self.assertEqual(k.get_lines(1), text1)
 
553
 
 
554
 
 
555
class Merge(TestBase):
 
556
    """Storage of versions that merge diverged parents"""
 
557
 
 
558
    def runTest(self):
 
559
        k = Weave()
 
560
 
 
561
        texts = [
 
562
            [b'header'],
 
563
            [b'header', b'', b'line from 1'],
 
564
            [b'header', b'', b'line from 2', b'more from 2'],
 
565
            [b'header', b'', b'line from 1', b'fixup line', b'line from 2'],
 
566
            ]
 
567
 
 
568
        k.add_lines(b'text0', [], texts[0])
 
569
        k.add_lines(b'text1', [b'text0'], texts[1])
 
570
        k.add_lines(b'text2', [b'text0'], texts[2])
 
571
        k.add_lines(b'merge', [b'text0', b'text1', b'text2'], texts[3])
 
572
 
 
573
        for i, t in enumerate(texts):
 
574
            self.assertEqual(k.get_lines(i), t)
 
575
 
 
576
        self.assertEqual(k.annotate(b'merge'),
 
577
                         [(b'text0', b'header'),
 
578
                          (b'text1', b''),
 
579
                          (b'text1', b'line from 1'),
 
580
                          (b'merge', b'fixup line'),
 
581
                          (b'text2', b'line from 2'),
 
582
                          ])
 
583
 
 
584
        self.assertEqual(list(k.get_ancestry([b'merge'])),
 
585
                         [b'text0', b'text1', b'text2', b'merge'])
 
586
 
 
587
        self.log('k._weave=' + pformat(k._weave))
 
588
 
 
589
        self.check_read_write(k)
 
590
 
 
591
 
 
592
class Conflicts(TestBase):
 
593
    """Test detection of conflicting regions during a merge.
 
594
 
 
595
    A base version is inserted, then two descendents try to
 
596
    insert different lines in the same place.  These should be
 
597
    reported as a possible conflict and forwarded to the user."""
 
598
 
 
599
    def runTest(self):
 
600
        return  # NOT RUN
 
601
        k = Weave()
 
602
 
 
603
        k.add_lines([], [b'aaa', b'bbb'])
 
604
        k.add_lines([0], [b'aaa', b'111', b'bbb'])
 
605
        k.add_lines([1], [b'aaa', b'222', b'bbb'])
 
606
 
 
607
        k.merge([1, 2])
 
608
 
 
609
        self.assertEqual([[[b'aaa']],
 
610
                          [[b'111'], [b'222']],
 
611
                          [[b'bbb']]])
 
612
 
 
613
 
 
614
class NonConflict(TestBase):
 
615
    """Two descendants insert compatible changes.
 
616
 
 
617
    No conflict should be reported."""
 
618
 
 
619
    def runTest(self):
 
620
        return  # NOT RUN
 
621
        k = Weave()
 
622
 
 
623
        k.add_lines([], [b'aaa', b'bbb'])
 
624
        k.add_lines([0], [b'111', b'aaa', b'ccc', b'bbb'])
 
625
        k.add_lines([1], [b'aaa', b'ccc', b'bbb', b'222'])
 
626
 
 
627
 
 
628
class Khayyam(TestBase):
 
629
    """Test changes to multi-line texts, and read/write"""
 
630
 
 
631
    def test_multi_line_merge(self):
 
632
        rawtexts = [
 
633
            b"""A Book of Verses underneath the Bough,
 
634
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
635
            Beside me singing in the Wilderness --
 
636
            Oh, Wilderness were Paradise enow!""",
 
637
 
 
638
            b"""A Book of Verses underneath the Bough,
 
639
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
640
            Beside me singing in the Wilderness --
 
641
            Oh, Wilderness were Paradise now!""",
 
642
 
 
643
            b"""A Book of poems underneath the tree,
 
644
            A Jug of Wine, a Loaf of Bread,
 
645
            and Thou
 
646
            Beside me singing in the Wilderness --
 
647
            Oh, Wilderness were Paradise now!
 
648
 
 
649
            -- O. Khayyam""",
 
650
 
 
651
            b"""A Book of Verses underneath the Bough,
 
652
            A Jug of Wine, a Loaf of Bread,
 
653
            and Thou
 
654
            Beside me singing in the Wilderness --
 
655
            Oh, Wilderness were Paradise now!""",
 
656
            ]
 
657
        texts = [[l.strip() for l in t.split(b'\n')] for t in rawtexts]
 
658
 
 
659
        k = Weave()
 
660
        parents = set()
 
661
        i = 0
 
662
        for t in texts:
 
663
            k.add_lines(b'text%d' % i, list(parents), t)
 
664
            parents.add(b'text%d' % i)
 
665
            i += 1
 
666
 
 
667
        self.log("k._weave=" + pformat(k._weave))
 
668
 
 
669
        for i, t in enumerate(texts):
 
670
            self.assertEqual(k.get_lines(i), t)
 
671
 
 
672
        self.check_read_write(k)
 
673
 
 
674
 
 
675
class JoinWeavesTests(TestBase):
 
676
 
 
677
    def setUp(self):
 
678
        super(JoinWeavesTests, self).setUp()
 
679
        self.weave1 = Weave()
 
680
        self.lines1 = [b'hello\n']
 
681
        self.lines3 = [b'hello\n', b'cruel\n', b'world\n']
 
682
        self.weave1.add_lines(b'v1', [], self.lines1)
 
683
        self.weave1.add_lines(b'v2', [b'v1'], [b'hello\n', b'world\n'])
 
684
        self.weave1.add_lines(b'v3', [b'v2'], self.lines3)
 
685
 
 
686
    def test_written_detection(self):
 
687
        # Test detection of weave file corruption.
 
688
        #
 
689
        # Make sure that we can detect if a weave file has
 
690
        # been corrupted. This doesn't test all forms of corruption,
 
691
        # but it at least helps verify the data you get, is what you want.
 
692
 
 
693
        w = Weave()
 
694
        w.add_lines(b'v1', [], [b'hello\n'])
 
695
        w.add_lines(b'v2', [b'v1'], [b'hello\n', b'there\n'])
 
696
 
 
697
        tmpf = BytesIO()
 
698
        write_weave(w, tmpf)
 
699
 
 
700
        # Because we are corrupting, we need to make sure we have the exact
 
701
        # text
 
702
        self.assertEqual(
 
703
            b'# bzr weave file v5\n'
 
704
            b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
705
            b'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
706
            b'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
707
            tmpf.getvalue())
 
708
 
 
709
        # Change a single letter
 
710
        tmpf = BytesIO(
 
711
            b'# bzr weave file v5\n'
 
712
            b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
713
            b'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
714
            b'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
715
 
 
716
        w = read_weave(tmpf)
 
717
 
 
718
        self.assertEqual(b'hello\n', w.get_text(b'v1'))
 
719
        self.assertRaises(WeaveInvalidChecksum, w.get_text, b'v2')
 
720
        self.assertRaises(WeaveInvalidChecksum, w.get_lines, b'v2')
 
721
        self.assertRaises(WeaveInvalidChecksum, w.check)
 
722
 
 
723
        # Change the sha checksum
 
724
        tmpf = BytesIO(
 
725
            b'# bzr weave file v5\n'
 
726
            b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
727
            b'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
728
            b'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
729
 
 
730
        w = read_weave(tmpf)
 
731
 
 
732
        self.assertEqual(b'hello\n', w.get_text(b'v1'))
 
733
        self.assertRaises(WeaveInvalidChecksum, w.get_text, b'v2')
 
734
        self.assertRaises(WeaveInvalidChecksum, w.get_lines, b'v2')
 
735
        self.assertRaises(WeaveInvalidChecksum, w.check)
 
736
 
 
737
 
 
738
class TestWeave(TestCase):
 
739
 
 
740
    def test_allow_reserved_false(self):
 
741
        w = Weave('name', allow_reserved=False)
 
742
        # Add lines is checked at the WeaveFile level, not at the Weave level
 
743
        w.add_lines(b'name:', [], TEXT_1)
 
744
        # But get_lines is checked at this level
 
745
        self.assertRaises(errors.ReservedId, w.get_lines, b'name:')
 
746
 
 
747
    def test_allow_reserved_true(self):
 
748
        w = Weave('name', allow_reserved=True)
 
749
        w.add_lines(b'name:', [], TEXT_1)
 
750
        self.assertEqual(TEXT_1, w.get_lines(b'name:'))
 
751
 
 
752
 
 
753
class InstrumentedWeave(Weave):
 
754
    """Keep track of how many times functions are called."""
 
755
 
 
756
    def __init__(self, weave_name=None):
 
757
        self._extract_count = 0
 
758
        Weave.__init__(self, weave_name=weave_name)
 
759
 
 
760
    def _extract(self, versions):
 
761
        self._extract_count += 1
 
762
        return Weave._extract(self, versions)
 
763
 
 
764
 
 
765
class TestNeedsReweave(TestCase):
 
766
    """Internal corner cases for when reweave is needed."""
 
767
 
 
768
    def test_compatible_parents(self):
 
769
        w1 = Weave('a')
 
770
        my_parents = {1, 2, 3}
 
771
        # subsets are ok
 
772
        self.assertTrue(w1._compatible_parents(my_parents, {3}))
 
773
        # same sets
 
774
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
775
        # same empty corner case
 
776
        self.assertTrue(w1._compatible_parents(set(), set()))
 
777
        # other cannot contain stuff my_parents does not
 
778
        self.assertFalse(w1._compatible_parents(set(), {1}))
 
779
        self.assertFalse(w1._compatible_parents(my_parents, {1, 2, 3, 4}))
 
780
        self.assertFalse(w1._compatible_parents(my_parents, {4}))
 
781
 
 
782
 
 
783
class TestWeaveFile(TestCaseInTempDir):
 
784
 
 
785
    def test_empty_file(self):
 
786
        with open('empty.weave', 'wb+') as f:
 
787
            self.assertRaises(WeaveFormatError, read_weave, f)