1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
22
from bzrlib.errors import (
23
RevisionAlreadyPresent,
28
from bzrlib.knit import (
36
from bzrlib.osutils import split_lines
37
from bzrlib.tests import TestCase, TestCaseWithTransport
38
from bzrlib.transport import TransportLogger, get_transport
39
from bzrlib.transport.memory import MemoryTransport
40
from bzrlib.weave import Weave
43
class KnitContentTests(TestCase):
45
def test_constructor(self):
46
content = KnitContent([])
49
content = KnitContent([])
50
self.assertEqual(content.text(), [])
52
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
53
self.assertEqual(content.text(), ["text1", "text2"])
55
def test_annotate(self):
56
content = KnitContent([])
57
self.assertEqual(content.annotate(), [])
59
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
60
self.assertEqual(content.annotate(),
61
[("origin1", "text1"), ("origin2", "text2")])
63
def test_annotate_iter(self):
64
content = KnitContent([])
65
it = content.annotate_iter()
66
self.assertRaises(StopIteration, it.next)
68
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
69
it = content.annotate_iter()
70
self.assertEqual(it.next(), ("origin1", "text1"))
71
self.assertEqual(it.next(), ("origin2", "text2"))
72
self.assertRaises(StopIteration, it.next)
75
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
77
self.assertIsInstance(copy, KnitContent)
78
self.assertEqual(copy.annotate(),
79
[("origin1", "text1"), ("origin2", "text2")])
81
def test_line_delta(self):
82
content1 = KnitContent([("", "a"), ("", "b")])
83
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
84
self.assertEqual(content1.line_delta(content2),
85
[(1, 2, 2, [("", "a"), ("", "c")])])
87
def test_line_delta_iter(self):
88
content1 = KnitContent([("", "a"), ("", "b")])
89
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
90
it = content1.line_delta_iter(content2)
91
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
92
self.assertRaises(StopIteration, it.next)
95
class MockTransport(object):
97
def __init__(self, file_lines=None):
98
self.file_lines = file_lines
101
def get(self, filename):
102
if self.file_lines is None:
103
raise NoSuchFile(filename)
105
return StringIO("\n".join(self.file_lines))
107
def __getattr__(self, name):
108
def queue_call(*args, **kwargs):
109
self.calls.append((name, args, kwargs))
113
class LowLevelKnitIndexTests(TestCase):
115
def test_no_such_file(self):
116
transport = MockTransport()
118
self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
119
self.assertRaises(NoSuchFile, _KnitIndex, transport,
120
"filename", "w", create=False)
122
def test_create_file(self):
123
transport = MockTransport()
125
index = _KnitIndex(transport, "filename", "w",
126
file_mode="wb", create=True)
128
("put_bytes_non_atomic",
129
("filename", index.HEADER), {"mode": "wb"}),
130
transport.calls.pop(0))
132
def test_delay_create_file(self):
133
transport = MockTransport()
135
index = _KnitIndex(transport, "filename", "w",
136
create=True, file_mode="wb", create_parent_dir=True,
137
delay_create=True, dir_mode=0777)
138
self.assertEqual([], transport.calls)
140
index.add_versions([])
141
name, (filename, f), kwargs = transport.calls.pop(0)
142
self.assertEqual("put_file_non_atomic", name)
144
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
146
self.assertEqual("filename", filename)
147
self.assertEqual(index.HEADER, f.read())
149
index.add_versions([])
150
self.assertEqual(("append_bytes", ("filename", ""), {}),
151
transport.calls.pop(0))
153
def test_read_utf8_version_id(self):
154
transport = MockTransport([
156
u"version-\N{CYRILLIC CAPITAL LETTER A}"
157
u" option 0 1 :".encode("utf-8")
159
index = _KnitIndex(transport, "filename", "r")
161
index.has_version(u"version-\N{CYRILLIC CAPITAL LETTER A}"))
163
def test_read_utf8_parents(self):
164
transport = MockTransport([
166
u"version option 0 1"
167
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")
169
index = _KnitIndex(transport, "filename", "r")
170
self.assertEqual([u"version-\N{CYRILLIC CAPITAL LETTER A}"],
171
index.get_parents_with_ghosts("version"))
173
def test_read_ignore_corrupted_lines(self):
174
transport = MockTransport([
177
"corrupted options 0 1 .b .c ",
178
"version options 0 1 :"
180
index = _KnitIndex(transport, "filename", "r")
181
self.assertEqual(1, index.num_versions())
182
self.assertTrue(index.has_version(u"version"))
184
def test_read_corrupted_header(self):
185
transport = MockTransport([])
186
self.assertRaises(KnitHeaderError,
187
_KnitIndex, transport, "filename", "r")
189
def test_read_duplicate_entries(self):
190
transport = MockTransport([
192
"parent options 0 1 :",
193
"version options1 0 1 0 :",
194
"version options2 1 2 .other :",
195
"version options3 3 4 0 .other :"
197
index = _KnitIndex(transport, "filename", "r")
198
self.assertEqual(2, index.num_versions())
199
self.assertEqual(1, index.lookup(u"version"))
200
self.assertEqual((3, 4), index.get_position(u"version"))
201
self.assertEqual(["options3"], index.get_options(u"version"))
202
self.assertEqual([u"parent", u"other"],
203
index.get_parents_with_ghosts(u"version"))
205
def test_read_compressed_parents(self):
206
transport = MockTransport([
210
"c option 0 1 1 0 :",
212
index = _KnitIndex(transport, "filename", "r")
213
self.assertEqual([u"a"], index.get_parents(u"b"))
214
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
216
def test_write_utf8_version_id(self):
217
transport = MockTransport([
220
index = _KnitIndex(transport, "filename", "r")
221
index.add_version(u"version-\N{CYRILLIC CAPITAL LETTER A}",
222
["option"], 0, 1, [])
223
self.assertEqual(("append_bytes", ("filename",
224
u"\nversion-\N{CYRILLIC CAPITAL LETTER A}"
225
u" option 0 1 :".encode("utf-8")),
227
transport.calls.pop(0))
229
def test_write_utf8_parents(self):
230
transport = MockTransport([
233
index = _KnitIndex(transport, "filename", "r")
234
index.add_version(u"version", ["option"], 0, 1,
235
[u"version-\N{CYRILLIC CAPITAL LETTER A}"])
236
self.assertEqual(("append_bytes", ("filename",
237
u"\nversion option 0 1"
238
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")),
240
transport.calls.pop(0))
242
def test_get_graph(self):
243
transport = MockTransport()
244
index = _KnitIndex(transport, "filename", "w", create=True)
245
self.assertEqual([], index.get_graph())
247
index.add_version(u"a", ["option"], 0, 1, [u"b"])
248
self.assertEqual([(u"a", [u"b"])], index.get_graph())
250
index.add_version(u"c", ["option"], 0, 1, [u"d"])
251
self.assertEqual([(u"a", [u"b"]), (u"c", [u"d"])],
252
sorted(index.get_graph()))
254
def test_get_ancestry(self):
255
transport = MockTransport([
258
"b option 0 1 0 .e :",
259
"c option 0 1 1 0 :",
260
"d option 0 1 2 .f :"
262
index = _KnitIndex(transport, "filename", "r")
264
self.assertEqual([], index.get_ancestry([]))
265
self.assertEqual([u"a"], index.get_ancestry([u"a"]))
266
self.assertEqual([u"a", u"b"], index.get_ancestry([u"b"]))
267
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"c"]))
268
self.assertEqual([u"a", u"b", u"c", u"d"], index.get_ancestry([u"d"]))
269
self.assertEqual([u"a", u"b"], index.get_ancestry([u"a", u"b"]))
270
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"a", u"c"]))
272
self.assertRaises(RevisionNotPresent, index.get_ancestry, [u"e"])
274
def test_get_ancestry_with_ghosts(self):
275
transport = MockTransport([
278
"b option 0 1 0 .e :",
279
"c option 0 1 0 .f .g :",
280
"d option 0 1 2 .h .j .k :"
282
index = _KnitIndex(transport, "filename", "r")
284
self.assertEqual([], index.get_ancestry_with_ghosts([]))
285
self.assertEqual([u"a"], index.get_ancestry_with_ghosts([u"a"]))
286
self.assertEqual([u"a", u"e", u"b"],
287
index.get_ancestry_with_ghosts([u"b"]))
288
self.assertEqual([u"a", u"g", u"f", u"c"],
289
index.get_ancestry_with_ghosts([u"c"]))
290
self.assertEqual([u"a", u"g", u"f", u"c", u"k", u"j", u"h", u"d"],
291
index.get_ancestry_with_ghosts([u"d"]))
292
self.assertEqual([u"a", u"e", u"b"],
293
index.get_ancestry_with_ghosts([u"a", u"b"]))
294
self.assertEqual([u"a", u"g", u"f", u"c"],
295
index.get_ancestry_with_ghosts([u"a", u"c"]))
297
[u"a", u"g", u"f", u"c", u"e", u"b", u"k", u"j", u"h", u"d"],
298
index.get_ancestry_with_ghosts([u"b", u"d"]))
300
self.assertRaises(RevisionNotPresent,
301
index.get_ancestry_with_ghosts, [u"e"])
303
def test_num_versions(self):
304
transport = MockTransport([
307
index = _KnitIndex(transport, "filename", "r")
309
self.assertEqual(0, index.num_versions())
310
self.assertEqual(0, len(index))
312
index.add_version(u"a", ["option"], 0, 1, [])
313
self.assertEqual(1, index.num_versions())
314
self.assertEqual(1, len(index))
316
index.add_version(u"a", ["option2"], 1, 2, [])
317
self.assertEqual(1, index.num_versions())
318
self.assertEqual(1, len(index))
320
index.add_version(u"b", ["option"], 0, 1, [])
321
self.assertEqual(2, index.num_versions())
322
self.assertEqual(2, len(index))
324
def test_get_versions(self):
325
transport = MockTransport([
328
index = _KnitIndex(transport, "filename", "r")
330
self.assertEqual([], index.get_versions())
332
index.add_version(u"a", ["option"], 0, 1, [])
333
self.assertEqual([u"a"], index.get_versions())
335
index.add_version(u"a", ["option"], 0, 1, [])
336
self.assertEqual([u"a"], index.get_versions())
338
index.add_version(u"b", ["option"], 0, 1, [])
339
self.assertEqual([u"a", u"b"], index.get_versions())
341
def test_idx_to_name(self):
342
transport = MockTransport([
347
index = _KnitIndex(transport, "filename", "r")
349
self.assertEqual(u"a", index.idx_to_name(0))
350
self.assertEqual(u"b", index.idx_to_name(1))
351
self.assertEqual(u"b", index.idx_to_name(-1))
352
self.assertEqual(u"a", index.idx_to_name(-2))
354
def test_lookup(self):
355
transport = MockTransport([
360
index = _KnitIndex(transport, "filename", "r")
362
self.assertEqual(0, index.lookup(u"a"))
363
self.assertEqual(1, index.lookup(u"b"))
365
def test_add_version(self):
366
transport = MockTransport([
369
index = _KnitIndex(transport, "filename", "r")
371
index.add_version(u"a", ["option"], 0, 1, [u"b"])
372
self.assertEqual(("append_bytes",
373
("filename", "\na option 0 1 .b :"),
374
{}), transport.calls.pop(0))
375
self.assertTrue(index.has_version(u"a"))
376
self.assertEqual(1, index.num_versions())
377
self.assertEqual((0, 1), index.get_position(u"a"))
378
self.assertEqual(["option"], index.get_options(u"a"))
379
self.assertEqual([u"b"], index.get_parents_with_ghosts(u"a"))
381
index.add_version(u"a", ["opt"], 1, 2, [u"c"])
382
self.assertEqual(("append_bytes",
383
("filename", "\na opt 1 2 .c :"),
384
{}), transport.calls.pop(0))
385
self.assertTrue(index.has_version(u"a"))
386
self.assertEqual(1, index.num_versions())
387
self.assertEqual((1, 2), index.get_position(u"a"))
388
self.assertEqual(["opt"], index.get_options(u"a"))
389
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
391
index.add_version(u"b", ["option"], 2, 3, [u"a"])
392
self.assertEqual(("append_bytes",
393
("filename", "\nb option 2 3 0 :"),
394
{}), transport.calls.pop(0))
395
self.assertTrue(index.has_version(u"b"))
396
self.assertEqual(2, index.num_versions())
397
self.assertEqual((2, 3), index.get_position(u"b"))
398
self.assertEqual(["option"], index.get_options(u"b"))
399
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
401
def test_add_versions(self):
402
transport = MockTransport([
405
index = _KnitIndex(transport, "filename", "r")
408
(u"a", ["option"], 0, 1, [u"b"]),
409
(u"a", ["opt"], 1, 2, [u"c"]),
410
(u"b", ["option"], 2, 3, [u"a"])
412
self.assertEqual(("append_bytes", ("filename",
413
"\na option 0 1 .b :"
416
), {}), transport.calls.pop(0))
417
self.assertTrue(index.has_version(u"a"))
418
self.assertTrue(index.has_version(u"b"))
419
self.assertEqual(2, index.num_versions())
420
self.assertEqual((1, 2), index.get_position(u"a"))
421
self.assertEqual((2, 3), index.get_position(u"b"))
422
self.assertEqual(["opt"], index.get_options(u"a"))
423
self.assertEqual(["option"], index.get_options(u"b"))
424
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
425
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
427
def test_delay_create_and_add_versions(self):
428
transport = MockTransport()
430
index = _KnitIndex(transport, "filename", "w",
431
create=True, file_mode="wb", create_parent_dir=True,
432
delay_create=True, dir_mode=0777)
433
self.assertEqual([], transport.calls)
436
(u"a", ["option"], 0, 1, [u"b"]),
437
(u"a", ["opt"], 1, 2, [u"c"]),
438
(u"b", ["option"], 2, 3, [u"a"])
440
name, (filename, f), kwargs = transport.calls.pop(0)
441
self.assertEqual("put_file_non_atomic", name)
443
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
445
self.assertEqual("filename", filename)
448
"\na option 0 1 .b :"
450
"\nb option 2 3 0 :",
453
def test_has_version(self):
454
transport = MockTransport([
458
index = _KnitIndex(transport, "filename", "r")
460
self.assertTrue(index.has_version(u"a"))
461
self.assertFalse(index.has_version(u"b"))
463
def test_get_position(self):
464
transport = MockTransport([
469
index = _KnitIndex(transport, "filename", "r")
471
self.assertEqual((0, 1), index.get_position(u"a"))
472
self.assertEqual((1, 2), index.get_position(u"b"))
474
def test_get_method(self):
475
transport = MockTransport([
477
"a fulltext,unknown 0 1 :",
478
"b unknown,line-delta 1 2 :",
481
index = _KnitIndex(transport, "filename", "r")
483
self.assertEqual("fulltext", index.get_method(u"a"))
484
self.assertEqual("line-delta", index.get_method(u"b"))
485
self.assertRaises(AssertionError, index.get_method, u"c")
487
def test_get_options(self):
488
transport = MockTransport([
493
index = _KnitIndex(transport, "filename", "r")
495
self.assertEqual(["opt1"], index.get_options(u"a"))
496
self.assertEqual(["opt2", "opt3"], index.get_options(u"b"))
498
def test_get_parents(self):
499
transport = MockTransport([
502
"b option 1 2 0 .c :",
503
"c option 1 2 1 0 .e :"
505
index = _KnitIndex(transport, "filename", "r")
507
self.assertEqual([], index.get_parents(u"a"))
508
self.assertEqual([u"a", u"c"], index.get_parents(u"b"))
509
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
511
def test_get_parents_with_ghosts(self):
512
transport = MockTransport([
515
"b option 1 2 0 .c :",
516
"c option 1 2 1 0 .e :"
518
index = _KnitIndex(transport, "filename", "r")
520
self.assertEqual([], index.get_parents_with_ghosts(u"a"))
521
self.assertEqual([u"a", u"c"], index.get_parents_with_ghosts(u"b"))
522
self.assertEqual([u"b", u"a", u"e"],
523
index.get_parents_with_ghosts(u"c"))
525
def test_check_versions_present(self):
526
transport = MockTransport([
531
index = _KnitIndex(transport, "filename", "r")
533
check = index.check_versions_present
539
self.assertRaises(RevisionNotPresent, check, [u"c"])
540
self.assertRaises(RevisionNotPresent, check, [u"a", u"b", u"c"])
543
class KnitTests(TestCaseWithTransport):
544
"""Class containing knit test helper routines."""
546
def make_test_knit(self, annotate=False, delay_create=False):
548
factory = KnitPlainFactory()
551
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
552
factory=factory, create=True,
553
delay_create=delay_create)
556
class BasicKnitTests(KnitTests):
558
def add_stock_one_and_one_a(self, k):
559
k.add_lines('text-1', [], split_lines(TEXT_1))
560
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
562
def test_knit_constructor(self):
563
"""Construct empty k"""
564
self.make_test_knit()
566
def test_knit_add(self):
567
"""Store one text in knit and retrieve"""
568
k = self.make_test_knit()
569
k.add_lines('text-1', [], split_lines(TEXT_1))
570
self.assertTrue(k.has_version('text-1'))
571
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
573
def test_knit_reload(self):
574
# test that the content in a reloaded knit is correct
575
k = self.make_test_knit()
576
k.add_lines('text-1', [], split_lines(TEXT_1))
578
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
579
self.assertTrue(k2.has_version('text-1'))
580
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
582
def test_knit_several(self):
583
"""Store several texts in a knit"""
584
k = self.make_test_knit()
585
k.add_lines('text-1', [], split_lines(TEXT_1))
586
k.add_lines('text-2', [], split_lines(TEXT_2))
587
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
588
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
590
def test_repeated_add(self):
591
"""Knit traps attempt to replace existing version"""
592
k = self.make_test_knit()
593
k.add_lines('text-1', [], split_lines(TEXT_1))
594
self.assertRaises(RevisionAlreadyPresent,
596
'text-1', [], split_lines(TEXT_1))
598
def test_empty(self):
599
k = self.make_test_knit(True)
600
k.add_lines('text-1', [], [])
601
self.assertEquals(k.get_lines('text-1'), [])
603
def test_incomplete(self):
604
"""Test if texts without a ending line-end can be inserted and
606
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
607
k.add_lines('text-1', [], ['a\n', 'b' ])
608
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
609
# reopening ensures maximum room for confusion
610
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
611
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
612
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
614
def test_delta(self):
615
"""Expression of knit delta as lines"""
616
k = self.make_test_knit()
617
td = list(line_delta(TEXT_1.splitlines(True),
618
TEXT_1A.splitlines(True)))
619
self.assertEqualDiff(''.join(td), delta_1_1a)
620
out = apply_line_delta(TEXT_1.splitlines(True), td)
621
self.assertEqualDiff(''.join(out), TEXT_1A)
623
def test_add_with_parents(self):
624
"""Store in knit with parents"""
625
k = self.make_test_knit()
626
self.add_stock_one_and_one_a(k)
627
self.assertEquals(k.get_parents('text-1'), [])
628
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
630
def test_ancestry(self):
631
"""Store in knit with parents"""
632
k = self.make_test_knit()
633
self.add_stock_one_and_one_a(k)
634
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
636
def test_add_delta(self):
637
"""Store in knit with parents"""
638
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
639
delta=True, create=True)
640
self.add_stock_one_and_one_a(k)
642
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
644
def test_annotate(self):
646
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
647
delta=True, create=True)
648
self.insert_and_test_small_annotate(k)
650
def insert_and_test_small_annotate(self, k):
651
"""test annotation with k works correctly."""
652
k.add_lines('text-1', [], ['a\n', 'b\n'])
653
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
655
origins = k.annotate('text-2')
656
self.assertEquals(origins[0], ('text-1', 'a\n'))
657
self.assertEquals(origins[1], ('text-2', 'c\n'))
659
def test_annotate_fulltext(self):
661
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
662
delta=False, create=True)
663
self.insert_and_test_small_annotate(k)
665
def test_annotate_merge_1(self):
666
k = self.make_test_knit(True)
667
k.add_lines('text-a1', [], ['a\n', 'b\n'])
668
k.add_lines('text-a2', [], ['d\n', 'c\n'])
669
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
670
origins = k.annotate('text-am')
671
self.assertEquals(origins[0], ('text-a2', 'd\n'))
672
self.assertEquals(origins[1], ('text-a1', 'b\n'))
674
def test_annotate_merge_2(self):
675
k = self.make_test_knit(True)
676
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
677
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
678
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
679
origins = k.annotate('text-am')
680
self.assertEquals(origins[0], ('text-a1', 'a\n'))
681
self.assertEquals(origins[1], ('text-a2', 'y\n'))
682
self.assertEquals(origins[2], ('text-a1', 'c\n'))
684
def test_annotate_merge_9(self):
685
k = self.make_test_knit(True)
686
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
687
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
688
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
689
origins = k.annotate('text-am')
690
self.assertEquals(origins[0], ('text-am', 'k\n'))
691
self.assertEquals(origins[1], ('text-a2', 'y\n'))
692
self.assertEquals(origins[2], ('text-a1', 'c\n'))
694
def test_annotate_merge_3(self):
695
k = self.make_test_knit(True)
696
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
697
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
698
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
699
origins = k.annotate('text-am')
700
self.assertEquals(origins[0], ('text-am', 'k\n'))
701
self.assertEquals(origins[1], ('text-a2', 'y\n'))
702
self.assertEquals(origins[2], ('text-a2', 'z\n'))
704
def test_annotate_merge_4(self):
705
k = self.make_test_knit(True)
706
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
707
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
708
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
709
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
710
origins = k.annotate('text-am')
711
self.assertEquals(origins[0], ('text-a1', 'a\n'))
712
self.assertEquals(origins[1], ('text-a1', 'b\n'))
713
self.assertEquals(origins[2], ('text-a2', 'z\n'))
715
def test_annotate_merge_5(self):
716
k = self.make_test_knit(True)
717
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
718
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
719
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
720
k.add_lines('text-am',
721
['text-a1', 'text-a2', 'text-a3'],
722
['a\n', 'e\n', 'z\n'])
723
origins = k.annotate('text-am')
724
self.assertEquals(origins[0], ('text-a1', 'a\n'))
725
self.assertEquals(origins[1], ('text-a2', 'e\n'))
726
self.assertEquals(origins[2], ('text-a3', 'z\n'))
728
def test_annotate_file_cherry_pick(self):
729
k = self.make_test_knit(True)
730
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
731
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
732
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
733
origins = k.annotate('text-3')
734
self.assertEquals(origins[0], ('text-1', 'a\n'))
735
self.assertEquals(origins[1], ('text-1', 'b\n'))
736
self.assertEquals(origins[2], ('text-1', 'c\n'))
738
def test_knit_join(self):
739
"""Store in knit with parents"""
740
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
741
k1.add_lines('text-a', [], split_lines(TEXT_1))
742
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
744
k1.add_lines('text-c', [], split_lines(TEXT_1))
745
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
747
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
749
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
750
count = k2.join(k1, version_ids=['text-m'])
751
self.assertEquals(count, 5)
752
self.assertTrue(k2.has_version('text-a'))
753
self.assertTrue(k2.has_version('text-c'))
755
def test_reannotate(self):
756
k1 = KnitVersionedFile('knit1', get_transport('.'),
757
factory=KnitAnnotateFactory(), create=True)
759
k1.add_lines('text-a', [], ['a\n', 'b\n'])
761
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
763
k2 = KnitVersionedFile('test2', get_transport('.'),
764
factory=KnitAnnotateFactory(), create=True)
765
k2.join(k1, version_ids=['text-b'])
768
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
770
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
772
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
774
# test-c will have index 3
775
k1.join(k2, version_ids=['text-c'])
777
lines = k1.get_lines('text-c')
778
self.assertEquals(lines, ['z\n', 'c\n'])
780
origins = k1.annotate('text-c')
781
self.assertEquals(origins[0], ('text-c', 'z\n'))
782
self.assertEquals(origins[1], ('text-b', 'c\n'))
784
def test_get_line_delta_texts(self):
785
"""Make sure we can call get_texts on text with reused line deltas"""
786
k1 = KnitVersionedFile('test1', get_transport('.'),
787
factory=KnitPlainFactory(), create=True)
792
parents = ['%d' % (t-1)]
793
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
794
k1.get_texts(('%d' % t) for t in range(3))
796
def test_iter_lines_reads_in_order(self):
797
t = MemoryTransport()
798
instrumented_t = TransportLogger(t)
799
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
800
self.assertEqual([('id.kndx',)], instrumented_t._calls)
801
# add texts with no required ordering
802
k1.add_lines('base', [], ['text\n'])
803
k1.add_lines('base2', [], ['text2\n'])
805
instrumented_t._calls = []
806
# request a last-first iteration
807
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
808
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
809
self.assertEqual(['text\n', 'text2\n'], results)
811
def test_create_empty_annotated(self):
812
k1 = self.make_test_knit(True)
814
k1.add_lines('text-a', [], ['a\n', 'b\n'])
815
k2 = k1.create_empty('t', MemoryTransport())
816
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
817
self.assertEqual(k1.delta, k2.delta)
818
# the generic test checks for empty content and file class
820
def test_knit_format(self):
821
# this tests that a new knit index file has the expected content
822
# and that is writes the data we expect as records are added.
823
knit = self.make_test_knit(True)
824
# Now knit files are not created until we first add data to them
825
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
826
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
827
self.assertFileEqual(
828
"# bzr knit index 8\n"
830
"revid fulltext 0 84 .a_ghost :",
832
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
833
self.assertFileEqual(
834
"# bzr knit index 8\n"
835
"\nrevid fulltext 0 84 .a_ghost :"
836
"\nrevid2 line-delta 84 82 0 :",
838
# we should be able to load this file again
839
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
840
self.assertEqual(['revid', 'revid2'], knit.versions())
841
# write a short write to the file and ensure that its ignored
842
indexfile = file('test.kndx', 'at')
843
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
845
# we should be able to load this file again
846
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
847
self.assertEqual(['revid', 'revid2'], knit.versions())
848
# and add a revision with the same id the failed write had
849
knit.add_lines('revid3', ['revid2'], ['a\n'])
850
# and when reading it revid3 should now appear.
851
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
852
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
853
self.assertEqual(['revid2'], knit.get_parents('revid3'))
855
def test_delay_create(self):
856
"""Test that passing delay_create=True creates files late"""
857
knit = self.make_test_knit(annotate=True, delay_create=True)
858
self.failIfExists('test.knit')
859
self.failIfExists('test.kndx')
860
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
861
self.failUnlessExists('test.knit')
862
self.assertFileEqual(
863
"# bzr knit index 8\n"
865
"revid fulltext 0 84 .a_ghost :",
868
def test_create_parent_dir(self):
869
"""create_parent_dir can create knits in nonexistant dirs"""
870
# Has no effect if we don't set 'delay_create'
871
trans = get_transport('.')
872
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
873
trans, access_mode='w', factory=None,
874
create=True, create_parent_dir=True)
875
# Nothing should have changed yet
876
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
877
factory=None, create=True,
878
create_parent_dir=True,
880
self.failIfExists('dir/test.knit')
881
self.failIfExists('dir/test.kndx')
882
self.failIfExists('dir')
883
knit.add_lines('revid', [], ['a\n'])
884
self.failUnlessExists('dir')
885
self.failUnlessExists('dir/test.knit')
886
self.assertFileEqual(
887
"# bzr knit index 8\n"
889
"revid fulltext 0 84 :",
892
def test_create_mode_700(self):
893
trans = get_transport('.')
894
if not trans._can_roundtrip_unix_modebits():
895
# Can't roundtrip, so no need to run this test
897
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
898
factory=None, create=True,
899
create_parent_dir=True,
903
knit.add_lines('revid', [], ['a\n'])
904
self.assertTransportMode(trans, 'dir', 0700)
905
self.assertTransportMode(trans, 'dir/test.knit', 0600)
906
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
908
def test_create_mode_770(self):
909
trans = get_transport('.')
910
if not trans._can_roundtrip_unix_modebits():
911
# Can't roundtrip, so no need to run this test
913
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
914
factory=None, create=True,
915
create_parent_dir=True,
919
knit.add_lines('revid', [], ['a\n'])
920
self.assertTransportMode(trans, 'dir', 0770)
921
self.assertTransportMode(trans, 'dir/test.knit', 0660)
922
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
924
def test_create_mode_777(self):
925
trans = get_transport('.')
926
if not trans._can_roundtrip_unix_modebits():
927
# Can't roundtrip, so no need to run this test
929
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
930
factory=None, create=True,
931
create_parent_dir=True,
935
knit.add_lines('revid', [], ['a\n'])
936
self.assertTransportMode(trans, 'dir', 0777)
937
self.assertTransportMode(trans, 'dir/test.knit', 0666)
938
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
940
def test_plan_merge(self):
941
my_knit = self.make_test_knit(annotate=True)
942
my_knit.add_lines('text1', [], split_lines(TEXT_1))
943
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
944
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
945
plan = list(my_knit.plan_merge('text1a', 'text1b'))
946
for plan_line, expected_line in zip(plan, AB_MERGE):
947
self.assertEqual(plan_line, expected_line)
959
Banana cup cake recipe
969
Banana cup cake recipe
971
- bananas (do not use plantains!!!)
978
Banana cup cake recipe
994
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
999
new-b|- bananas (do not use plantains!!!)
1000
unchanged|- broken tea cups
1001
new-a|- self-raising flour
1004
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1007
def line_delta(from_lines, to_lines):
1008
"""Generate line-based delta from one text to another"""
1009
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1010
for op in s.get_opcodes():
1011
if op[0] == 'equal':
1013
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1014
for i in range(op[3], op[4]):
1018
def apply_line_delta(basis_lines, delta_lines):
1019
"""Apply a line-based perfect diff
1021
basis_lines -- text to apply the patch to
1022
delta_lines -- diff instructions and content
1024
out = basis_lines[:]
1027
while i < len(delta_lines):
1029
a, b, c = map(long, l.split(','))
1031
out[offset+a:offset+b] = delta_lines[i:i+c]
1033
offset = offset + (b - a) + c
1037
class TestWeaveToKnit(KnitTests):
1039
def test_weave_to_knit_matches(self):
1040
# check that the WeaveToKnit is_compatible function
1041
# registers True for a Weave to a Knit.
1043
k = self.make_test_knit()
1044
self.failUnless(WeaveToKnit.is_compatible(w, k))
1045
self.failIf(WeaveToKnit.is_compatible(k, w))
1046
self.failIf(WeaveToKnit.is_compatible(w, w))
1047
self.failIf(WeaveToKnit.is_compatible(k, k))
1050
class TestKnitCaching(KnitTests):
1052
def create_knit(self, cache_add=False):
1053
k = self.make_test_knit(True)
1057
k.add_lines('text-1', [], split_lines(TEXT_1))
1058
k.add_lines('text-2', [], split_lines(TEXT_2))
1061
def test_no_caching(self):
1062
k = self.create_knit()
1063
# Nothing should be cached without setting 'enable_cache'
1064
self.assertEqual({}, k._data._cache)
1066
def test_cache_add_and_clear(self):
1067
k = self.create_knit(True)
1069
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1072
self.assertEqual({}, k._data._cache)
1074
def test_cache_data_read_raw(self):
1075
k = self.create_knit()
1077
# Now cache and read
1080
def read_one_raw(version):
1081
pos_map = k._get_components_positions([version])
1082
method, pos, size, next = pos_map[version]
1083
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1084
self.assertEqual(1, len(lst))
1087
val = read_one_raw('text-1')
1088
self.assertEqual({'text-1':val[1]}, k._data._cache)
1091
# After clear, new reads are not cached
1092
self.assertEqual({}, k._data._cache)
1094
val2 = read_one_raw('text-1')
1095
self.assertEqual(val, val2)
1096
self.assertEqual({}, k._data._cache)
1098
def test_cache_data_read(self):
1099
k = self.create_knit()
1101
def read_one(version):
1102
pos_map = k._get_components_positions([version])
1103
method, pos, size, next = pos_map[version]
1104
lst = list(k._data.read_records_iter([(version, pos, size)]))
1105
self.assertEqual(1, len(lst))
1108
# Now cache and read
1111
val = read_one('text-2')
1112
self.assertEqual(['text-2'], k._data._cache.keys())
1113
self.assertEqual('text-2', val[0])
1114
content, digest = k._data._parse_record('text-2',
1115
k._data._cache['text-2'])
1116
self.assertEqual(content, val[1])
1117
self.assertEqual(digest, val[2])
1120
self.assertEqual({}, k._data._cache)
1122
val2 = read_one('text-2')
1123
self.assertEqual(val, val2)
1124
self.assertEqual({}, k._data._cache)
1126
def test_cache_read(self):
1127
k = self.create_knit()
1130
text = k.get_text('text-1')
1131
self.assertEqual(TEXT_1, text)
1132
self.assertEqual(['text-1'], k._data._cache.keys())
1135
self.assertEqual({}, k._data._cache)
1137
text = k.get_text('text-1')
1138
self.assertEqual(TEXT_1, text)
1139
self.assertEqual({}, k._data._cache)
1142
class TestKnitIndex(KnitTests):
1144
def test_add_versions_dictionary_compresses(self):
1145
"""Adding versions to the index should update the lookup dict"""
1146
knit = self.make_test_knit()
1148
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1149
self.check_file_contents('test.kndx',
1150
'# bzr knit index 8\n'
1152
'a-1 fulltext 0 0 :'
1154
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1155
('a-3', ['fulltext'], 0, 0, ['a-2']),
1157
self.check_file_contents('test.kndx',
1158
'# bzr knit index 8\n'
1160
'a-1 fulltext 0 0 :\n'
1161
'a-2 fulltext 0 0 0 :\n'
1162
'a-3 fulltext 0 0 1 :'
1164
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1165
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1166
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1167
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1170
def test_add_versions_fails_clean(self):
1171
"""If add_versions fails in the middle, it restores a pristine state.
1173
Any modifications that are made to the index are reset if all versions
1176
# This cheats a little bit by passing in a generator which will
1177
# raise an exception before the processing finishes
1178
# Other possibilities would be to have an version with the wrong number
1179
# of entries, or to make the backing transport unable to write any
1182
knit = self.make_test_knit()
1184
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1186
class StopEarly(Exception):
1189
def generate_failure():
1190
"""Add some entries and then raise an exception"""
1191
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1192
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1195
# Assert the pre-condition
1196
self.assertEqual(['a-1'], idx._history)
1197
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1199
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1201
# And it shouldn't be modified
1202
self.assertEqual(['a-1'], idx._history)
1203
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1205
def test_knit_index_ignores_empty_files(self):
1206
# There was a race condition in older bzr, where a ^C at the right time
1207
# could leave an empty .kndx file, which bzr would later claim was a
1208
# corrupted file since the header was not present. In reality, the file
1209
# just wasn't created, so it should be ignored.
1210
t = get_transport('.')
1211
t.put_bytes('test.kndx', '')
1213
knit = self.make_test_knit()
1215
def test_knit_index_checks_header(self):
1216
t = get_transport('.')
1217
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1219
self.assertRaises(KnitHeaderError, self.make_test_knit)