/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_weave.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-24 00:39:50 UTC
  • mto: This revision was merged to the branch mainline in revision 7504.
  • Revision ID: jelmer@jelmer.uk-20200524003950-bbc545r76vc5yajg
Add github action.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: tests regarding version names
 
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
 
20
#       if it fails.
 
21
 
 
22
"""test suite for weave algorithm"""
 
23
 
 
24
from io import BytesIO
 
25
from pprint import pformat
 
26
 
 
27
from .. import (
 
28
    errors,
 
29
    )
 
30
from ..osutils import sha_string
 
31
from . import TestCase, TestCaseInTempDir
 
32
from ..bzr.weave import Weave, WeaveFormatError, WeaveInvalidChecksum
 
33
from ..bzr.weavefile import write_weave, read_weave
 
34
 
 
35
 
 
36
# texts for use in testing
 
37
TEXT_0 = [b"Hello world"]
 
38
TEXT_1 = [b"Hello world",
 
39
          b"A second line"]
 
40
 
 
41
 
 
42
class TestBase(TestCase):
 
43
 
 
44
    def check_read_write(self, k):
 
45
        """Check the weave k can be written & re-read."""
 
46
        from tempfile import TemporaryFile
 
47
        tf = TemporaryFile()
 
48
 
 
49
        write_weave(k, tf)
 
50
        tf.seek(0)
 
51
        k2 = read_weave(tf)
 
52
 
 
53
        if k != k2:
 
54
            tf.seek(0)
 
55
            self.log('serialized weave:')
 
56
            self.log(tf.read())
 
57
 
 
58
            self.log('')
 
59
            self.log('parents: %s' % (k._parents == k2._parents))
 
60
            self.log('         %r' % k._parents)
 
61
            self.log('         %r' % k2._parents)
 
62
            self.log('')
 
63
            self.fail('read/write check failed')
 
64
 
 
65
 
 
66
class WeaveContains(TestBase):
 
67
    """Weave __contains__ operator"""
 
68
 
 
69
    def runTest(self):
 
70
        k = Weave(get_scope=lambda: None)
 
71
        self.assertFalse(b'foo' in k)
 
72
        k.add_lines(b'foo', [], TEXT_1)
 
73
        self.assertTrue(b'foo' in k)
 
74
 
 
75
 
 
76
class Easy(TestBase):
 
77
 
 
78
    def runTest(self):
 
79
        Weave()
 
80
 
 
81
 
 
82
class AnnotateOne(TestBase):
 
83
 
 
84
    def runTest(self):
 
85
        k = Weave()
 
86
        k.add_lines(b'text0', [], TEXT_0)
 
87
        self.assertEqual(k.annotate(b'text0'),
 
88
                         [(b'text0', TEXT_0[0])])
 
89
 
 
90
 
 
91
class InvalidAdd(TestBase):
 
92
    """Try to use invalid version number during add."""
 
93
 
 
94
    def runTest(self):
 
95
        k = Weave()
 
96
 
 
97
        self.assertRaises(errors.RevisionNotPresent,
 
98
                          k.add_lines,
 
99
                          b'text0',
 
100
                          [b'69'],
 
101
                          [b'new text!'])
 
102
 
 
103
 
 
104
class RepeatedAdd(TestBase):
 
105
    """Add the same version twice; harmless."""
 
106
 
 
107
    def test_duplicate_add(self):
 
108
        k = Weave()
 
109
        idx = k.add_lines(b'text0', [], TEXT_0)
 
110
        idx2 = k.add_lines(b'text0', [], TEXT_0)
 
111
        self.assertEqual(idx, idx2)
 
112
 
 
113
 
 
114
class InvalidRepeatedAdd(TestBase):
 
115
 
 
116
    def runTest(self):
 
117
        k = Weave()
 
118
        k.add_lines(b'basis', [], TEXT_0)
 
119
        k.add_lines(b'text0', [], TEXT_0)
 
120
        self.assertRaises(errors.RevisionAlreadyPresent,
 
121
                          k.add_lines,
 
122
                          b'text0',
 
123
                          [],
 
124
                          [b'not the same text'])
 
125
        self.assertRaises(errors.RevisionAlreadyPresent,
 
126
                          k.add_lines,
 
127
                          b'text0',
 
128
                          [b'basis'],         # not the right parents
 
129
                          TEXT_0)
 
130
 
 
131
 
 
132
class InsertLines(TestBase):
 
133
    """Store a revision that adds one line to the original.
 
134
 
 
135
    Look at the annotations to make sure that the first line is matched
 
136
    and not stored repeatedly."""
 
137
 
 
138
    def runTest(self):
 
139
        k = Weave()
 
140
 
 
141
        k.add_lines(b'text0', [], [b'line 1'])
 
142
        k.add_lines(b'text1', [b'text0'], [b'line 1', b'line 2'])
 
143
 
 
144
        self.assertEqual(k.annotate(b'text0'),
 
145
                         [(b'text0', b'line 1')])
 
146
 
 
147
        self.assertEqual(k.get_lines(1),
 
148
                         [b'line 1',
 
149
                          b'line 2'])
 
150
 
 
151
        self.assertEqual(k.annotate(b'text1'),
 
152
                         [(b'text0', b'line 1'),
 
153
                          (b'text1', b'line 2')])
 
154
 
 
155
        k.add_lines(b'text2', [b'text0'], [b'line 1', b'diverged line'])
 
156
 
 
157
        self.assertEqual(k.annotate(b'text2'),
 
158
                         [(b'text0', b'line 1'),
 
159
                          (b'text2', b'diverged line')])
 
160
 
 
161
        text3 = [b'line 1', b'middle line', b'line 2']
 
162
        k.add_lines(b'text3',
 
163
                    [b'text0', b'text1'],
 
164
                    text3)
 
165
 
 
166
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]),
 
167
        # text3))))
 
168
 
 
169
        self.log("k._weave=" + pformat(k._weave))
 
170
 
 
171
        self.assertEqual(k.annotate(b'text3'),
 
172
                         [(b'text0', b'line 1'),
 
173
                          (b'text3', b'middle line'),
 
174
                          (b'text1', b'line 2')])
 
175
 
 
176
        # now multiple insertions at different places
 
177
        k.add_lines(
 
178
            b'text4', [b'text0', b'text1', b'text3'],
 
179
            [b'line 1', b'aaa', b'middle line', b'bbb', b'line 2', b'ccc'])
 
180
 
 
181
        self.assertEqual(k.annotate(b'text4'),
 
182
                         [(b'text0', b'line 1'),
 
183
                          (b'text4', b'aaa'),
 
184
                          (b'text3', b'middle line'),
 
185
                          (b'text4', b'bbb'),
 
186
                          (b'text1', b'line 2'),
 
187
                          (b'text4', b'ccc')])
 
188
 
 
189
 
 
190
class DeleteLines(TestBase):
 
191
    """Deletion of lines from existing text.
 
192
 
 
193
    Try various texts all based on a common ancestor."""
 
194
 
 
195
    def runTest(self):
 
196
        k = Weave()
 
197
 
 
198
        base_text = [b'one', b'two', b'three', b'four']
 
199
 
 
200
        k.add_lines(b'text0', [], base_text)
 
201
 
 
202
        texts = [[b'one', b'two', b'three'],
 
203
                 [b'two', b'three', b'four'],
 
204
                 [b'one', b'four'],
 
205
                 [b'one', b'two', b'three', b'four'],
 
206
                 ]
 
207
 
 
208
        i = 1
 
209
        for t in texts:
 
210
            k.add_lines(b'text%d' % i, [b'text0'], t)
 
211
            i += 1
 
212
 
 
213
        self.log('final weave:')
 
214
        self.log('k._weave=' + pformat(k._weave))
 
215
 
 
216
        for i in range(len(texts)):
 
217
            self.assertEqual(k.get_lines(i + 1),
 
218
                             texts[i])
 
219
 
 
220
 
 
221
class SuicideDelete(TestBase):
 
222
    """Invalid weave which tries to add and delete simultaneously."""
 
223
 
 
224
    def runTest(self):
 
225
        k = Weave()
 
226
 
 
227
        k._parents = [(),
 
228
                      ]
 
229
        k._weave = [(b'{', 0),
 
230
                    b'first line',
 
231
                    (b'[', 0),
 
232
                    b'deleted in 0',
 
233
                    (b']', 0),
 
234
                    (b'}', 0),
 
235
                    ]
 
236
        # SKIPPED
 
237
        # Weave.get doesn't trap this anymore
 
238
        return
 
239
 
 
240
        self.assertRaises(WeaveFormatError,
 
241
                          k.get_lines,
 
242
                          0)
 
243
 
 
244
 
 
245
class CannedDelete(TestBase):
 
246
    """Unpack canned weave with deleted lines."""
 
247
 
 
248
    def runTest(self):
 
249
        k = Weave()
 
250
 
 
251
        k._parents = [(),
 
252
                      frozenset([0]),
 
253
                      ]
 
254
        k._weave = [(b'{', 0),
 
255
                    b'first line',
 
256
                    (b'[', 1),
 
257
                    b'line to be deleted',
 
258
                    (b']', 1),
 
259
                    b'last line',
 
260
                    (b'}', 0),
 
261
                    ]
 
262
        k._sha1s = [
 
263
            sha_string(b'first lineline to be deletedlast line'),
 
264
            sha_string(b'first linelast line')]
 
265
 
 
266
        self.assertEqual(k.get_lines(0),
 
267
                         [b'first line',
 
268
                          b'line to be deleted',
 
269
                          b'last line',
 
270
                          ])
 
271
 
 
272
        self.assertEqual(k.get_lines(1),
 
273
                         [b'first line',
 
274
                          b'last line',
 
275
                          ])
 
276
 
 
277
 
 
278
class CannedReplacement(TestBase):
 
279
    """Unpack canned weave with deleted lines."""
 
280
 
 
281
    def runTest(self):
 
282
        k = Weave()
 
283
 
 
284
        k._parents = [frozenset(),
 
285
                      frozenset([0]),
 
286
                      ]
 
287
        k._weave = [(b'{', 0),
 
288
                    b'first line',
 
289
                    (b'[', 1),
 
290
                    b'line to be deleted',
 
291
                    (b']', 1),
 
292
                    (b'{', 1),
 
293
                    b'replacement line',
 
294
                    (b'}', 1),
 
295
                    b'last line',
 
296
                    (b'}', 0),
 
297
                    ]
 
298
        k._sha1s = [
 
299
            sha_string(b'first lineline to be deletedlast line'),
 
300
            sha_string(b'first linereplacement linelast line')]
 
301
 
 
302
        self.assertEqual(k.get_lines(0),
 
303
                         [b'first line',
 
304
                          b'line to be deleted',
 
305
                          b'last line',
 
306
                          ])
 
307
 
 
308
        self.assertEqual(k.get_lines(1),
 
309
                         [b'first line',
 
310
                          b'replacement line',
 
311
                          b'last line',
 
312
                          ])
 
313
 
 
314
 
 
315
class BadWeave(TestBase):
 
316
    """Test that we trap an insert which should not occur."""
 
317
 
 
318
    def runTest(self):
 
319
        k = Weave()
 
320
 
 
321
        k._parents = [frozenset(),
 
322
                      ]
 
323
        k._weave = [b'bad line',
 
324
                    (b'{', 0),
 
325
                    b'foo {',
 
326
                    (b'{', 1),
 
327
                    b'  added in version 1',
 
328
                    (b'{', 2),
 
329
                    b'  added in v2',
 
330
                    (b'}', 2),
 
331
                    b'  also from v1',
 
332
                    (b'}', 1),
 
333
                    b'}',
 
334
                    (b'}', 0)]
 
335
 
 
336
        # SKIPPED
 
337
        # Weave.get doesn't trap this anymore
 
338
        return
 
339
 
 
340
        self.assertRaises(WeaveFormatError,
 
341
                          k.get,
 
342
                          0)
 
343
 
 
344
 
 
345
class BadInsert(TestBase):
 
346
    """Test that we trap an insert which should not occur."""
 
347
 
 
348
    def runTest(self):
 
349
        k = Weave()
 
350
 
 
351
        k._parents = [frozenset(),
 
352
                      frozenset([0]),
 
353
                      frozenset([0]),
 
354
                      frozenset([0, 1, 2]),
 
355
                      ]
 
356
        k._weave = [(b'{', 0),
 
357
                    b'foo {',
 
358
                    (b'{', 1),
 
359
                    b'  added in version 1',
 
360
                    (b'{', 1),
 
361
                    b'  more in 1',
 
362
                    (b'}', 1),
 
363
                    (b'}', 1),
 
364
                    (b'}', 0)]
 
365
 
 
366
        # this is not currently enforced by get
 
367
        return
 
368
 
 
369
        self.assertRaises(WeaveFormatError,
 
370
                          k.get,
 
371
                          0)
 
372
 
 
373
        self.assertRaises(WeaveFormatError,
 
374
                          k.get,
 
375
                          1)
 
376
 
 
377
 
 
378
class InsertNested(TestBase):
 
379
    """Insertion with nested instructions."""
 
380
 
 
381
    def runTest(self):
 
382
        k = Weave()
 
383
 
 
384
        k._parents = [frozenset(),
 
385
                      frozenset([0]),
 
386
                      frozenset([0]),
 
387
                      frozenset([0, 1, 2]),
 
388
                      ]
 
389
        k._weave = [(b'{', 0),
 
390
                    b'foo {',
 
391
                    (b'{', 1),
 
392
                    b'  added in version 1',
 
393
                    (b'{', 2),
 
394
                    b'  added in v2',
 
395
                    (b'}', 2),
 
396
                    b'  also from v1',
 
397
                    (b'}', 1),
 
398
                    b'}',
 
399
                    (b'}', 0)]
 
400
 
 
401
        k._sha1s = [
 
402
            sha_string(b'foo {}'),
 
403
            sha_string(b'foo {  added in version 1  also from v1}'),
 
404
            sha_string(b'foo {  added in v2}'),
 
405
            sha_string(
 
406
                b'foo {  added in version 1  added in v2  also from v1}')
 
407
            ]
 
408
 
 
409
        self.assertEqual(k.get_lines(0),
 
410
                         [b'foo {',
 
411
                          b'}'])
 
412
 
 
413
        self.assertEqual(k.get_lines(1),
 
414
                         [b'foo {',
 
415
                          b'  added in version 1',
 
416
                          b'  also from v1',
 
417
                          b'}'])
 
418
 
 
419
        self.assertEqual(k.get_lines(2),
 
420
                         [b'foo {',
 
421
                          b'  added in v2',
 
422
                          b'}'])
 
423
 
 
424
        self.assertEqual(k.get_lines(3),
 
425
                         [b'foo {',
 
426
                          b'  added in version 1',
 
427
                          b'  added in v2',
 
428
                          b'  also from v1',
 
429
                          b'}'])
 
430
 
 
431
 
 
432
class DeleteLines2(TestBase):
 
433
    """Test recording revisions that delete lines.
 
434
 
 
435
    This relies on the weave having a way to represent lines knocked
 
436
    out by a later revision."""
 
437
 
 
438
    def runTest(self):
 
439
        k = Weave()
 
440
 
 
441
        k.add_lines(b'text0', [], [b"line the first",
 
442
                                   b"line 2",
 
443
                                   b"line 3",
 
444
                                   b"fine"])
 
445
 
 
446
        self.assertEqual(len(k.get_lines(0)), 4)
 
447
 
 
448
        k.add_lines(b'text1', [b'text0'], [b"line the first",
 
449
                                           b"fine"])
 
450
 
 
451
        self.assertEqual(k.get_lines(1),
 
452
                         [b"line the first",
 
453
                          b"fine"])
 
454
 
 
455
        self.assertEqual(k.annotate(b'text1'),
 
456
                         [(b'text0', b"line the first"),
 
457
                          (b'text0', b"fine")])
 
458
 
 
459
 
 
460
class IncludeVersions(TestBase):
 
461
    """Check texts that are stored across multiple revisions.
 
462
 
 
463
    Here we manually create a weave with particular encoding and make
 
464
    sure it unpacks properly.
 
465
 
 
466
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
467
    lines.
 
468
    """
 
469
 
 
470
    def runTest(self):
 
471
        k = Weave()
 
472
 
 
473
        k._parents = [frozenset(), frozenset([0])]
 
474
        k._weave = [(b'{', 0),
 
475
                    b"first line",
 
476
                    (b'}', 0),
 
477
                    (b'{', 1),
 
478
                    b"second line",
 
479
                    (b'}', 1)]
 
480
 
 
481
        k._sha1s = [sha_string(b'first line'), sha_string(
 
482
            b'first linesecond line')]
 
483
 
 
484
        self.assertEqual(k.get_lines(1),
 
485
                         [b"first line",
 
486
                          b"second line"])
 
487
 
 
488
        self.assertEqual(k.get_lines(0),
 
489
                         [b"first line"])
 
490
 
 
491
 
 
492
class DivergedIncludes(TestBase):
 
493
    """Weave with two diverged texts based on version 0.
 
494
    """
 
495
 
 
496
    def runTest(self):
 
497
        # FIXME make the weave, dont poke at it.
 
498
        k = Weave()
 
499
 
 
500
        k._names = [b'0', b'1', b'2']
 
501
        k._name_map = {b'0': 0, b'1': 1, b'2': 2}
 
502
        k._parents = [frozenset(),
 
503
                      frozenset([0]),
 
504
                      frozenset([0]),
 
505
                      ]
 
506
        k._weave = [(b'{', 0),
 
507
                    b"first line",
 
508
                    (b'}', 0),
 
509
                    (b'{', 1),
 
510
                    b"second line",
 
511
                    (b'}', 1),
 
512
                    (b'{', 2),
 
513
                    b"alternative second line",
 
514
                    (b'}', 2),
 
515
                    ]
 
516
 
 
517
        k._sha1s = [
 
518
            sha_string(b'first line'),
 
519
            sha_string(b'first linesecond line'),
 
520
            sha_string(b'first linealternative second line')]
 
521
 
 
522
        self.assertEqual(k.get_lines(0),
 
523
                         [b"first line"])
 
524
 
 
525
        self.assertEqual(k.get_lines(1),
 
526
                         [b"first line",
 
527
                          b"second line"])
 
528
 
 
529
        self.assertEqual(k.get_lines(b'2'),
 
530
                         [b"first line",
 
531
                          b"alternative second line"])
 
532
 
 
533
        self.assertEqual(list(k.get_ancestry([b'2'])),
 
534
                         [b'0', b'2'])
 
535
 
 
536
 
 
537
class ReplaceLine(TestBase):
 
538
    def runTest(self):
 
539
        k = Weave()
 
540
 
 
541
        text0 = [b'cheddar', b'stilton', b'gruyere']
 
542
        text1 = [b'cheddar', b'blue vein', b'neufchatel', b'chevre']
 
543
 
 
544
        k.add_lines(b'text0', [], text0)
 
545
        k.add_lines(b'text1', [b'text0'], text1)
 
546
 
 
547
        self.log('k._weave=' + pformat(k._weave))
 
548
 
 
549
        self.assertEqual(k.get_lines(0), text0)
 
550
        self.assertEqual(k.get_lines(1), text1)
 
551
 
 
552
 
 
553
class Merge(TestBase):
 
554
    """Storage of versions that merge diverged parents"""
 
555
 
 
556
    def runTest(self):
 
557
        k = Weave()
 
558
 
 
559
        texts = [
 
560
            [b'header'],
 
561
            [b'header', b'', b'line from 1'],
 
562
            [b'header', b'', b'line from 2', b'more from 2'],
 
563
            [b'header', b'', b'line from 1', b'fixup line', b'line from 2'],
 
564
            ]
 
565
 
 
566
        k.add_lines(b'text0', [], texts[0])
 
567
        k.add_lines(b'text1', [b'text0'], texts[1])
 
568
        k.add_lines(b'text2', [b'text0'], texts[2])
 
569
        k.add_lines(b'merge', [b'text0', b'text1', b'text2'], texts[3])
 
570
 
 
571
        for i, t in enumerate(texts):
 
572
            self.assertEqual(k.get_lines(i), t)
 
573
 
 
574
        self.assertEqual(k.annotate(b'merge'),
 
575
                         [(b'text0', b'header'),
 
576
                          (b'text1', b''),
 
577
                          (b'text1', b'line from 1'),
 
578
                          (b'merge', b'fixup line'),
 
579
                          (b'text2', b'line from 2'),
 
580
                          ])
 
581
 
 
582
        self.assertEqual(list(k.get_ancestry([b'merge'])),
 
583
                         [b'text0', b'text1', b'text2', b'merge'])
 
584
 
 
585
        self.log('k._weave=' + pformat(k._weave))
 
586
 
 
587
        self.check_read_write(k)
 
588
 
 
589
 
 
590
class Conflicts(TestBase):
 
591
    """Test detection of conflicting regions during a merge.
 
592
 
 
593
    A base version is inserted, then two descendents try to
 
594
    insert different lines in the same place.  These should be
 
595
    reported as a possible conflict and forwarded to the user."""
 
596
 
 
597
    def runTest(self):
 
598
        return  # NOT RUN
 
599
        k = Weave()
 
600
 
 
601
        k.add_lines([], [b'aaa', b'bbb'])
 
602
        k.add_lines([0], [b'aaa', b'111', b'bbb'])
 
603
        k.add_lines([1], [b'aaa', b'222', b'bbb'])
 
604
 
 
605
        k.merge([1, 2])
 
606
 
 
607
        self.assertEqual([[[b'aaa']],
 
608
                          [[b'111'], [b'222']],
 
609
                          [[b'bbb']]])
 
610
 
 
611
 
 
612
class NonConflict(TestBase):
 
613
    """Two descendants insert compatible changes.
 
614
 
 
615
    No conflict should be reported."""
 
616
 
 
617
    def runTest(self):
 
618
        return  # NOT RUN
 
619
        k = Weave()
 
620
 
 
621
        k.add_lines([], [b'aaa', b'bbb'])
 
622
        k.add_lines([0], [b'111', b'aaa', b'ccc', b'bbb'])
 
623
        k.add_lines([1], [b'aaa', b'ccc', b'bbb', b'222'])
 
624
 
 
625
 
 
626
class Khayyam(TestBase):
 
627
    """Test changes to multi-line texts, and read/write"""
 
628
 
 
629
    def test_multi_line_merge(self):
 
630
        rawtexts = [
 
631
            b"""A Book of Verses underneath the Bough,
 
632
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
633
            Beside me singing in the Wilderness --
 
634
            Oh, Wilderness were Paradise enow!""",
 
635
 
 
636
            b"""A Book of Verses underneath the Bough,
 
637
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
638
            Beside me singing in the Wilderness --
 
639
            Oh, Wilderness were Paradise now!""",
 
640
 
 
641
            b"""A Book of poems underneath the tree,
 
642
            A Jug of Wine, a Loaf of Bread,
 
643
            and Thou
 
644
            Beside me singing in the Wilderness --
 
645
            Oh, Wilderness were Paradise now!
 
646
 
 
647
            -- O. Khayyam""",
 
648
 
 
649
            b"""A Book of Verses underneath the Bough,
 
650
            A Jug of Wine, a Loaf of Bread,
 
651
            and Thou
 
652
            Beside me singing in the Wilderness --
 
653
            Oh, Wilderness were Paradise now!""",
 
654
            ]
 
655
        texts = [[l.strip() for l in t.split(b'\n')] for t in rawtexts]
 
656
 
 
657
        k = Weave()
 
658
        parents = set()
 
659
        i = 0
 
660
        for t in texts:
 
661
            k.add_lines(b'text%d' % i, list(parents), t)
 
662
            parents.add(b'text%d' % i)
 
663
            i += 1
 
664
 
 
665
        self.log("k._weave=" + pformat(k._weave))
 
666
 
 
667
        for i, t in enumerate(texts):
 
668
            self.assertEqual(k.get_lines(i), t)
 
669
 
 
670
        self.check_read_write(k)
 
671
 
 
672
 
 
673
class JoinWeavesTests(TestBase):
 
674
 
 
675
    def setUp(self):
 
676
        super(JoinWeavesTests, self).setUp()
 
677
        self.weave1 = Weave()
 
678
        self.lines1 = [b'hello\n']
 
679
        self.lines3 = [b'hello\n', b'cruel\n', b'world\n']
 
680
        self.weave1.add_lines(b'v1', [], self.lines1)
 
681
        self.weave1.add_lines(b'v2', [b'v1'], [b'hello\n', b'world\n'])
 
682
        self.weave1.add_lines(b'v3', [b'v2'], self.lines3)
 
683
 
 
684
    def test_written_detection(self):
 
685
        # Test detection of weave file corruption.
 
686
        #
 
687
        # Make sure that we can detect if a weave file has
 
688
        # been corrupted. This doesn't test all forms of corruption,
 
689
        # but it at least helps verify the data you get, is what you want.
 
690
 
 
691
        w = Weave()
 
692
        w.add_lines(b'v1', [], [b'hello\n'])
 
693
        w.add_lines(b'v2', [b'v1'], [b'hello\n', b'there\n'])
 
694
 
 
695
        tmpf = BytesIO()
 
696
        write_weave(w, tmpf)
 
697
 
 
698
        # Because we are corrupting, we need to make sure we have the exact
 
699
        # text
 
700
        self.assertEqual(
 
701
            b'# bzr weave file v5\n'
 
702
            b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
703
            b'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
704
            b'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
705
            tmpf.getvalue())
 
706
 
 
707
        # Change a single letter
 
708
        tmpf = BytesIO(
 
709
            b'# bzr weave file v5\n'
 
710
            b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
711
            b'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
712
            b'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
713
 
 
714
        w = read_weave(tmpf)
 
715
 
 
716
        self.assertEqual(b'hello\n', w.get_text(b'v1'))
 
717
        self.assertRaises(WeaveInvalidChecksum, w.get_text, b'v2')
 
718
        self.assertRaises(WeaveInvalidChecksum, w.get_lines, b'v2')
 
719
        self.assertRaises(WeaveInvalidChecksum, w.check)
 
720
 
 
721
        # Change the sha checksum
 
722
        tmpf = BytesIO(
 
723
            b'# bzr weave file v5\n'
 
724
            b'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
725
            b'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
726
            b'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
727
 
 
728
        w = read_weave(tmpf)
 
729
 
 
730
        self.assertEqual(b'hello\n', w.get_text(b'v1'))
 
731
        self.assertRaises(WeaveInvalidChecksum, w.get_text, b'v2')
 
732
        self.assertRaises(WeaveInvalidChecksum, w.get_lines, b'v2')
 
733
        self.assertRaises(WeaveInvalidChecksum, w.check)
 
734
 
 
735
 
 
736
class TestWeave(TestCase):
 
737
 
 
738
    def test_allow_reserved_false(self):
 
739
        w = Weave('name', allow_reserved=False)
 
740
        # Add lines is checked at the WeaveFile level, not at the Weave level
 
741
        w.add_lines(b'name:', [], TEXT_1)
 
742
        # But get_lines is checked at this level
 
743
        self.assertRaises(errors.ReservedId, w.get_lines, b'name:')
 
744
 
 
745
    def test_allow_reserved_true(self):
 
746
        w = Weave('name', allow_reserved=True)
 
747
        w.add_lines(b'name:', [], TEXT_1)
 
748
        self.assertEqual(TEXT_1, w.get_lines(b'name:'))
 
749
 
 
750
 
 
751
class InstrumentedWeave(Weave):
 
752
    """Keep track of how many times functions are called."""
 
753
 
 
754
    def __init__(self, weave_name=None):
 
755
        self._extract_count = 0
 
756
        Weave.__init__(self, weave_name=weave_name)
 
757
 
 
758
    def _extract(self, versions):
 
759
        self._extract_count += 1
 
760
        return Weave._extract(self, versions)
 
761
 
 
762
 
 
763
class TestNeedsReweave(TestCase):
 
764
    """Internal corner cases for when reweave is needed."""
 
765
 
 
766
    def test_compatible_parents(self):
 
767
        w1 = Weave('a')
 
768
        my_parents = {1, 2, 3}
 
769
        # subsets are ok
 
770
        self.assertTrue(w1._compatible_parents(my_parents, {3}))
 
771
        # same sets
 
772
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
773
        # same empty corner case
 
774
        self.assertTrue(w1._compatible_parents(set(), set()))
 
775
        # other cannot contain stuff my_parents does not
 
776
        self.assertFalse(w1._compatible_parents(set(), {1}))
 
777
        self.assertFalse(w1._compatible_parents(my_parents, {1, 2, 3, 4}))
 
778
        self.assertFalse(w1._compatible_parents(my_parents, {4}))
 
779
 
 
780
 
 
781
class TestWeaveFile(TestCaseInTempDir):
 
782
 
 
783
    def test_empty_file(self):
 
784
        with open('empty.weave', 'wb+') as f:
 
785
            self.assertRaises(WeaveFormatError, read_weave, f)