1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
22
from bzrlib.errors import (
23
RevisionAlreadyPresent,
28
from bzrlib.knit import (
36
from bzrlib.osutils import split_lines
37
from bzrlib.tests import TestCase, TestCaseWithTransport
38
from bzrlib.transport import TransportLogger, get_transport
39
from bzrlib.transport.memory import MemoryTransport
40
from bzrlib.weave import Weave
43
class KnitContentTests(TestCase):
45
def test_constructor(self):
46
content = KnitContent([])
49
content = KnitContent([])
50
self.assertEqual(content.text(), [])
52
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
53
self.assertEqual(content.text(), ["text1", "text2"])
55
def test_annotate(self):
56
content = KnitContent([])
57
self.assertEqual(content.annotate(), [])
59
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
60
self.assertEqual(content.annotate(),
61
[("origin1", "text1"), ("origin2", "text2")])
63
def test_annotate_iter(self):
64
content = KnitContent([])
65
it = content.annotate_iter()
66
self.assertRaises(StopIteration, it.next)
68
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
69
it = content.annotate_iter()
70
self.assertEqual(it.next(), ("origin1", "text1"))
71
self.assertEqual(it.next(), ("origin2", "text2"))
72
self.assertRaises(StopIteration, it.next)
75
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
77
self.assertIsInstance(copy, KnitContent)
78
self.assertEqual(copy.annotate(),
79
[("origin1", "text1"), ("origin2", "text2")])
81
def test_line_delta(self):
82
content1 = KnitContent([("", "a"), ("", "b")])
83
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
84
self.assertEqual(content1.line_delta(content2),
85
[(1, 2, 2, [("", "a"), ("", "c")])])
87
def test_line_delta_iter(self):
88
content1 = KnitContent([("", "a"), ("", "b")])
89
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
90
it = content1.line_delta_iter(content2)
91
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
92
self.assertRaises(StopIteration, it.next)
95
class MockTransport(object):
97
def __init__(self, file_lines=None):
98
self.file_lines = file_lines
100
# We have no base directory for the MockTransport
103
def get(self, filename):
104
if self.file_lines is None:
105
raise NoSuchFile(filename)
107
return StringIO("\n".join(self.file_lines))
109
def __getattr__(self, name):
110
def queue_call(*args, **kwargs):
111
self.calls.append((name, args, kwargs))
115
class LowLevelKnitIndexTests(TestCase):
117
def test_no_such_file(self):
118
transport = MockTransport()
120
self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
121
self.assertRaises(NoSuchFile, _KnitIndex, transport,
122
"filename", "w", create=False)
124
def test_create_file(self):
125
transport = MockTransport()
127
index = _KnitIndex(transport, "filename", "w",
128
file_mode="wb", create=True)
130
("put_bytes_non_atomic",
131
("filename", index.HEADER), {"mode": "wb"}),
132
transport.calls.pop(0))
134
def test_delay_create_file(self):
135
transport = MockTransport()
137
index = _KnitIndex(transport, "filename", "w",
138
create=True, file_mode="wb", create_parent_dir=True,
139
delay_create=True, dir_mode=0777)
140
self.assertEqual([], transport.calls)
142
index.add_versions([])
143
name, (filename, f), kwargs = transport.calls.pop(0)
144
self.assertEqual("put_file_non_atomic", name)
146
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
148
self.assertEqual("filename", filename)
149
self.assertEqual(index.HEADER, f.read())
151
index.add_versions([])
152
self.assertEqual(("append_bytes", ("filename", ""), {}),
153
transport.calls.pop(0))
155
def test_read_utf8_version_id(self):
156
transport = MockTransport([
158
u"version-\N{CYRILLIC CAPITAL LETTER A}"
159
u" option 0 1 :".encode("utf-8")
161
index = _KnitIndex(transport, "filename", "r")
163
index.has_version(u"version-\N{CYRILLIC CAPITAL LETTER A}"))
165
def test_read_utf8_parents(self):
166
transport = MockTransport([
168
u"version option 0 1"
169
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")
171
index = _KnitIndex(transport, "filename", "r")
172
self.assertEqual([u"version-\N{CYRILLIC CAPITAL LETTER A}"],
173
index.get_parents_with_ghosts("version"))
175
def test_read_ignore_corrupted_lines(self):
176
transport = MockTransport([
179
"corrupted options 0 1 .b .c ",
180
"version options 0 1 :"
182
index = _KnitIndex(transport, "filename", "r")
183
self.assertEqual(1, index.num_versions())
184
self.assertTrue(index.has_version(u"version"))
186
def test_read_corrupted_header(self):
187
transport = MockTransport(['not a bzr knit index header\n'])
188
self.assertRaises(KnitHeaderError,
189
_KnitIndex, transport, "filename", "r")
191
def test_read_duplicate_entries(self):
192
transport = MockTransport([
194
"parent options 0 1 :",
195
"version options1 0 1 0 :",
196
"version options2 1 2 .other :",
197
"version options3 3 4 0 .other :"
199
index = _KnitIndex(transport, "filename", "r")
200
self.assertEqual(2, index.num_versions())
201
self.assertEqual(1, index.lookup(u"version"))
202
self.assertEqual((3, 4), index.get_position(u"version"))
203
self.assertEqual(["options3"], index.get_options(u"version"))
204
self.assertEqual([u"parent", u"other"],
205
index.get_parents_with_ghosts(u"version"))
207
def test_read_compressed_parents(self):
208
transport = MockTransport([
212
"c option 0 1 1 0 :",
214
index = _KnitIndex(transport, "filename", "r")
215
self.assertEqual([u"a"], index.get_parents(u"b"))
216
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
218
def test_write_utf8_version_id(self):
219
transport = MockTransport([
222
index = _KnitIndex(transport, "filename", "r")
223
index.add_version(u"version-\N{CYRILLIC CAPITAL LETTER A}",
224
["option"], 0, 1, [])
225
self.assertEqual(("append_bytes", ("filename",
226
u"\nversion-\N{CYRILLIC CAPITAL LETTER A}"
227
u" option 0 1 :".encode("utf-8")),
229
transport.calls.pop(0))
231
def test_write_utf8_parents(self):
232
transport = MockTransport([
235
index = _KnitIndex(transport, "filename", "r")
236
index.add_version(u"version", ["option"], 0, 1,
237
[u"version-\N{CYRILLIC CAPITAL LETTER A}"])
238
self.assertEqual(("append_bytes", ("filename",
239
u"\nversion option 0 1"
240
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")),
242
transport.calls.pop(0))
244
def test_get_graph(self):
245
transport = MockTransport()
246
index = _KnitIndex(transport, "filename", "w", create=True)
247
self.assertEqual([], index.get_graph())
249
index.add_version(u"a", ["option"], 0, 1, [u"b"])
250
self.assertEqual([(u"a", [u"b"])], index.get_graph())
252
index.add_version(u"c", ["option"], 0, 1, [u"d"])
253
self.assertEqual([(u"a", [u"b"]), (u"c", [u"d"])],
254
sorted(index.get_graph()))
256
def test_get_ancestry(self):
257
transport = MockTransport([
260
"b option 0 1 0 .e :",
261
"c option 0 1 1 0 :",
262
"d option 0 1 2 .f :"
264
index = _KnitIndex(transport, "filename", "r")
266
self.assertEqual([], index.get_ancestry([]))
267
self.assertEqual([u"a"], index.get_ancestry([u"a"]))
268
self.assertEqual([u"a", u"b"], index.get_ancestry([u"b"]))
269
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"c"]))
270
self.assertEqual([u"a", u"b", u"c", u"d"], index.get_ancestry([u"d"]))
271
self.assertEqual([u"a", u"b"], index.get_ancestry([u"a", u"b"]))
272
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"a", u"c"]))
274
self.assertRaises(RevisionNotPresent, index.get_ancestry, [u"e"])
276
def test_get_ancestry_with_ghosts(self):
277
transport = MockTransport([
280
"b option 0 1 0 .e :",
281
"c option 0 1 0 .f .g :",
282
"d option 0 1 2 .h .j .k :"
284
index = _KnitIndex(transport, "filename", "r")
286
self.assertEqual([], index.get_ancestry_with_ghosts([]))
287
self.assertEqual([u"a"], index.get_ancestry_with_ghosts([u"a"]))
288
self.assertEqual([u"a", u"e", u"b"],
289
index.get_ancestry_with_ghosts([u"b"]))
290
self.assertEqual([u"a", u"g", u"f", u"c"],
291
index.get_ancestry_with_ghosts([u"c"]))
292
self.assertEqual([u"a", u"g", u"f", u"c", u"k", u"j", u"h", u"d"],
293
index.get_ancestry_with_ghosts([u"d"]))
294
self.assertEqual([u"a", u"e", u"b"],
295
index.get_ancestry_with_ghosts([u"a", u"b"]))
296
self.assertEqual([u"a", u"g", u"f", u"c"],
297
index.get_ancestry_with_ghosts([u"a", u"c"]))
299
[u"a", u"g", u"f", u"c", u"e", u"b", u"k", u"j", u"h", u"d"],
300
index.get_ancestry_with_ghosts([u"b", u"d"]))
302
self.assertRaises(RevisionNotPresent,
303
index.get_ancestry_with_ghosts, [u"e"])
305
def test_num_versions(self):
306
transport = MockTransport([
309
index = _KnitIndex(transport, "filename", "r")
311
self.assertEqual(0, index.num_versions())
312
self.assertEqual(0, len(index))
314
index.add_version(u"a", ["option"], 0, 1, [])
315
self.assertEqual(1, index.num_versions())
316
self.assertEqual(1, len(index))
318
index.add_version(u"a", ["option2"], 1, 2, [])
319
self.assertEqual(1, index.num_versions())
320
self.assertEqual(1, len(index))
322
index.add_version(u"b", ["option"], 0, 1, [])
323
self.assertEqual(2, index.num_versions())
324
self.assertEqual(2, len(index))
326
def test_get_versions(self):
327
transport = MockTransport([
330
index = _KnitIndex(transport, "filename", "r")
332
self.assertEqual([], index.get_versions())
334
index.add_version(u"a", ["option"], 0, 1, [])
335
self.assertEqual([u"a"], index.get_versions())
337
index.add_version(u"a", ["option"], 0, 1, [])
338
self.assertEqual([u"a"], index.get_versions())
340
index.add_version(u"b", ["option"], 0, 1, [])
341
self.assertEqual([u"a", u"b"], index.get_versions())
343
def test_idx_to_name(self):
344
transport = MockTransport([
349
index = _KnitIndex(transport, "filename", "r")
351
self.assertEqual(u"a", index.idx_to_name(0))
352
self.assertEqual(u"b", index.idx_to_name(1))
353
self.assertEqual(u"b", index.idx_to_name(-1))
354
self.assertEqual(u"a", index.idx_to_name(-2))
356
def test_lookup(self):
357
transport = MockTransport([
362
index = _KnitIndex(transport, "filename", "r")
364
self.assertEqual(0, index.lookup(u"a"))
365
self.assertEqual(1, index.lookup(u"b"))
367
def test_add_version(self):
368
transport = MockTransport([
371
index = _KnitIndex(transport, "filename", "r")
373
index.add_version(u"a", ["option"], 0, 1, [u"b"])
374
self.assertEqual(("append_bytes",
375
("filename", "\na option 0 1 .b :"),
376
{}), transport.calls.pop(0))
377
self.assertTrue(index.has_version(u"a"))
378
self.assertEqual(1, index.num_versions())
379
self.assertEqual((0, 1), index.get_position(u"a"))
380
self.assertEqual(["option"], index.get_options(u"a"))
381
self.assertEqual([u"b"], index.get_parents_with_ghosts(u"a"))
383
index.add_version(u"a", ["opt"], 1, 2, [u"c"])
384
self.assertEqual(("append_bytes",
385
("filename", "\na opt 1 2 .c :"),
386
{}), transport.calls.pop(0))
387
self.assertTrue(index.has_version(u"a"))
388
self.assertEqual(1, index.num_versions())
389
self.assertEqual((1, 2), index.get_position(u"a"))
390
self.assertEqual(["opt"], index.get_options(u"a"))
391
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
393
index.add_version(u"b", ["option"], 2, 3, [u"a"])
394
self.assertEqual(("append_bytes",
395
("filename", "\nb option 2 3 0 :"),
396
{}), transport.calls.pop(0))
397
self.assertTrue(index.has_version(u"b"))
398
self.assertEqual(2, index.num_versions())
399
self.assertEqual((2, 3), index.get_position(u"b"))
400
self.assertEqual(["option"], index.get_options(u"b"))
401
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
403
def test_add_versions(self):
404
transport = MockTransport([
407
index = _KnitIndex(transport, "filename", "r")
410
(u"a", ["option"], 0, 1, [u"b"]),
411
(u"a", ["opt"], 1, 2, [u"c"]),
412
(u"b", ["option"], 2, 3, [u"a"])
414
self.assertEqual(("append_bytes", ("filename",
415
"\na option 0 1 .b :"
418
), {}), transport.calls.pop(0))
419
self.assertTrue(index.has_version(u"a"))
420
self.assertTrue(index.has_version(u"b"))
421
self.assertEqual(2, index.num_versions())
422
self.assertEqual((1, 2), index.get_position(u"a"))
423
self.assertEqual((2, 3), index.get_position(u"b"))
424
self.assertEqual(["opt"], index.get_options(u"a"))
425
self.assertEqual(["option"], index.get_options(u"b"))
426
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
427
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
429
def test_delay_create_and_add_versions(self):
430
transport = MockTransport()
432
index = _KnitIndex(transport, "filename", "w",
433
create=True, file_mode="wb", create_parent_dir=True,
434
delay_create=True, dir_mode=0777)
435
self.assertEqual([], transport.calls)
438
(u"a", ["option"], 0, 1, [u"b"]),
439
(u"a", ["opt"], 1, 2, [u"c"]),
440
(u"b", ["option"], 2, 3, [u"a"])
442
name, (filename, f), kwargs = transport.calls.pop(0)
443
self.assertEqual("put_file_non_atomic", name)
445
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
447
self.assertEqual("filename", filename)
450
"\na option 0 1 .b :"
452
"\nb option 2 3 0 :",
455
def test_has_version(self):
456
transport = MockTransport([
460
index = _KnitIndex(transport, "filename", "r")
462
self.assertTrue(index.has_version(u"a"))
463
self.assertFalse(index.has_version(u"b"))
465
def test_get_position(self):
466
transport = MockTransport([
471
index = _KnitIndex(transport, "filename", "r")
473
self.assertEqual((0, 1), index.get_position(u"a"))
474
self.assertEqual((1, 2), index.get_position(u"b"))
476
def test_get_method(self):
477
transport = MockTransport([
479
"a fulltext,unknown 0 1 :",
480
"b unknown,line-delta 1 2 :",
483
index = _KnitIndex(transport, "filename", "r")
485
self.assertEqual("fulltext", index.get_method(u"a"))
486
self.assertEqual("line-delta", index.get_method(u"b"))
487
self.assertRaises(AssertionError, index.get_method, u"c")
489
def test_get_options(self):
490
transport = MockTransport([
495
index = _KnitIndex(transport, "filename", "r")
497
self.assertEqual(["opt1"], index.get_options(u"a"))
498
self.assertEqual(["opt2", "opt3"], index.get_options(u"b"))
500
def test_get_parents(self):
501
transport = MockTransport([
504
"b option 1 2 0 .c :",
505
"c option 1 2 1 0 .e :"
507
index = _KnitIndex(transport, "filename", "r")
509
self.assertEqual([], index.get_parents(u"a"))
510
self.assertEqual([u"a", u"c"], index.get_parents(u"b"))
511
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
513
def test_get_parents_with_ghosts(self):
514
transport = MockTransport([
517
"b option 1 2 0 .c :",
518
"c option 1 2 1 0 .e :"
520
index = _KnitIndex(transport, "filename", "r")
522
self.assertEqual([], index.get_parents_with_ghosts(u"a"))
523
self.assertEqual([u"a", u"c"], index.get_parents_with_ghosts(u"b"))
524
self.assertEqual([u"b", u"a", u"e"],
525
index.get_parents_with_ghosts(u"c"))
527
def test_check_versions_present(self):
528
transport = MockTransport([
533
index = _KnitIndex(transport, "filename", "r")
535
check = index.check_versions_present
541
self.assertRaises(RevisionNotPresent, check, [u"c"])
542
self.assertRaises(RevisionNotPresent, check, [u"a", u"b", u"c"])
545
class KnitTests(TestCaseWithTransport):
546
"""Class containing knit test helper routines."""
548
def make_test_knit(self, annotate=False, delay_create=False):
550
factory = KnitPlainFactory()
553
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
554
factory=factory, create=True,
555
delay_create=delay_create)
558
class BasicKnitTests(KnitTests):
560
def add_stock_one_and_one_a(self, k):
561
k.add_lines('text-1', [], split_lines(TEXT_1))
562
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
564
def test_knit_constructor(self):
565
"""Construct empty k"""
566
self.make_test_knit()
568
def test_knit_add(self):
569
"""Store one text in knit and retrieve"""
570
k = self.make_test_knit()
571
k.add_lines('text-1', [], split_lines(TEXT_1))
572
self.assertTrue(k.has_version('text-1'))
573
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
575
def test_knit_reload(self):
576
# test that the content in a reloaded knit is correct
577
k = self.make_test_knit()
578
k.add_lines('text-1', [], split_lines(TEXT_1))
580
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
581
self.assertTrue(k2.has_version('text-1'))
582
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
584
def test_knit_several(self):
585
"""Store several texts in a knit"""
586
k = self.make_test_knit()
587
k.add_lines('text-1', [], split_lines(TEXT_1))
588
k.add_lines('text-2', [], split_lines(TEXT_2))
589
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
590
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
592
def test_repeated_add(self):
593
"""Knit traps attempt to replace existing version"""
594
k = self.make_test_knit()
595
k.add_lines('text-1', [], split_lines(TEXT_1))
596
self.assertRaises(RevisionAlreadyPresent,
598
'text-1', [], split_lines(TEXT_1))
600
def test_empty(self):
601
k = self.make_test_knit(True)
602
k.add_lines('text-1', [], [])
603
self.assertEquals(k.get_lines('text-1'), [])
605
def test_incomplete(self):
606
"""Test if texts without a ending line-end can be inserted and
608
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
609
k.add_lines('text-1', [], ['a\n', 'b' ])
610
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
611
# reopening ensures maximum room for confusion
612
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
613
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
614
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
616
def test_delta(self):
617
"""Expression of knit delta as lines"""
618
k = self.make_test_knit()
619
td = list(line_delta(TEXT_1.splitlines(True),
620
TEXT_1A.splitlines(True)))
621
self.assertEqualDiff(''.join(td), delta_1_1a)
622
out = apply_line_delta(TEXT_1.splitlines(True), td)
623
self.assertEqualDiff(''.join(out), TEXT_1A)
625
def test_add_with_parents(self):
626
"""Store in knit with parents"""
627
k = self.make_test_knit()
628
self.add_stock_one_and_one_a(k)
629
self.assertEquals(k.get_parents('text-1'), [])
630
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
632
def test_ancestry(self):
633
"""Store in knit with parents"""
634
k = self.make_test_knit()
635
self.add_stock_one_and_one_a(k)
636
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
638
def test_add_delta(self):
639
"""Store in knit with parents"""
640
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
641
delta=True, create=True)
642
self.add_stock_one_and_one_a(k)
644
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
646
def test_annotate(self):
648
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
649
delta=True, create=True)
650
self.insert_and_test_small_annotate(k)
652
def insert_and_test_small_annotate(self, k):
653
"""test annotation with k works correctly."""
654
k.add_lines('text-1', [], ['a\n', 'b\n'])
655
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
657
origins = k.annotate('text-2')
658
self.assertEquals(origins[0], ('text-1', 'a\n'))
659
self.assertEquals(origins[1], ('text-2', 'c\n'))
661
def test_annotate_fulltext(self):
663
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
664
delta=False, create=True)
665
self.insert_and_test_small_annotate(k)
667
def test_annotate_merge_1(self):
668
k = self.make_test_knit(True)
669
k.add_lines('text-a1', [], ['a\n', 'b\n'])
670
k.add_lines('text-a2', [], ['d\n', 'c\n'])
671
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
672
origins = k.annotate('text-am')
673
self.assertEquals(origins[0], ('text-a2', 'd\n'))
674
self.assertEquals(origins[1], ('text-a1', 'b\n'))
676
def test_annotate_merge_2(self):
677
k = self.make_test_knit(True)
678
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
679
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
680
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
681
origins = k.annotate('text-am')
682
self.assertEquals(origins[0], ('text-a1', 'a\n'))
683
self.assertEquals(origins[1], ('text-a2', 'y\n'))
684
self.assertEquals(origins[2], ('text-a1', 'c\n'))
686
def test_annotate_merge_9(self):
687
k = self.make_test_knit(True)
688
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
689
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
690
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
691
origins = k.annotate('text-am')
692
self.assertEquals(origins[0], ('text-am', 'k\n'))
693
self.assertEquals(origins[1], ('text-a2', 'y\n'))
694
self.assertEquals(origins[2], ('text-a1', 'c\n'))
696
def test_annotate_merge_3(self):
697
k = self.make_test_knit(True)
698
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
699
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
700
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
701
origins = k.annotate('text-am')
702
self.assertEquals(origins[0], ('text-am', 'k\n'))
703
self.assertEquals(origins[1], ('text-a2', 'y\n'))
704
self.assertEquals(origins[2], ('text-a2', 'z\n'))
706
def test_annotate_merge_4(self):
707
k = self.make_test_knit(True)
708
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
709
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
710
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
711
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
712
origins = k.annotate('text-am')
713
self.assertEquals(origins[0], ('text-a1', 'a\n'))
714
self.assertEquals(origins[1], ('text-a1', 'b\n'))
715
self.assertEquals(origins[2], ('text-a2', 'z\n'))
717
def test_annotate_merge_5(self):
718
k = self.make_test_knit(True)
719
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
720
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
721
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
722
k.add_lines('text-am',
723
['text-a1', 'text-a2', 'text-a3'],
724
['a\n', 'e\n', 'z\n'])
725
origins = k.annotate('text-am')
726
self.assertEquals(origins[0], ('text-a1', 'a\n'))
727
self.assertEquals(origins[1], ('text-a2', 'e\n'))
728
self.assertEquals(origins[2], ('text-a3', 'z\n'))
730
def test_annotate_file_cherry_pick(self):
731
k = self.make_test_knit(True)
732
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
733
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
734
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
735
origins = k.annotate('text-3')
736
self.assertEquals(origins[0], ('text-1', 'a\n'))
737
self.assertEquals(origins[1], ('text-1', 'b\n'))
738
self.assertEquals(origins[2], ('text-1', 'c\n'))
740
def test_knit_join(self):
741
"""Store in knit with parents"""
742
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
743
k1.add_lines('text-a', [], split_lines(TEXT_1))
744
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
746
k1.add_lines('text-c', [], split_lines(TEXT_1))
747
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
749
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
751
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
752
count = k2.join(k1, version_ids=['text-m'])
753
self.assertEquals(count, 5)
754
self.assertTrue(k2.has_version('text-a'))
755
self.assertTrue(k2.has_version('text-c'))
757
def test_reannotate(self):
758
k1 = KnitVersionedFile('knit1', get_transport('.'),
759
factory=KnitAnnotateFactory(), create=True)
761
k1.add_lines('text-a', [], ['a\n', 'b\n'])
763
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
765
k2 = KnitVersionedFile('test2', get_transport('.'),
766
factory=KnitAnnotateFactory(), create=True)
767
k2.join(k1, version_ids=['text-b'])
770
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
772
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
774
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
776
# test-c will have index 3
777
k1.join(k2, version_ids=['text-c'])
779
lines = k1.get_lines('text-c')
780
self.assertEquals(lines, ['z\n', 'c\n'])
782
origins = k1.annotate('text-c')
783
self.assertEquals(origins[0], ('text-c', 'z\n'))
784
self.assertEquals(origins[1], ('text-b', 'c\n'))
786
def test_get_line_delta_texts(self):
787
"""Make sure we can call get_texts on text with reused line deltas"""
788
k1 = KnitVersionedFile('test1', get_transport('.'),
789
factory=KnitPlainFactory(), create=True)
794
parents = ['%d' % (t-1)]
795
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
796
k1.get_texts(('%d' % t) for t in range(3))
798
def test_iter_lines_reads_in_order(self):
799
t = MemoryTransport()
800
instrumented_t = TransportLogger(t)
801
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
802
self.assertEqual([('id.kndx',)], instrumented_t._calls)
803
# add texts with no required ordering
804
k1.add_lines('base', [], ['text\n'])
805
k1.add_lines('base2', [], ['text2\n'])
807
instrumented_t._calls = []
808
# request a last-first iteration
809
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
810
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
811
self.assertEqual(['text\n', 'text2\n'], results)
813
def test_create_empty_annotated(self):
814
k1 = self.make_test_knit(True)
816
k1.add_lines('text-a', [], ['a\n', 'b\n'])
817
k2 = k1.create_empty('t', MemoryTransport())
818
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
819
self.assertEqual(k1.delta, k2.delta)
820
# the generic test checks for empty content and file class
822
def test_knit_format(self):
823
# this tests that a new knit index file has the expected content
824
# and that is writes the data we expect as records are added.
825
knit = self.make_test_knit(True)
826
# Now knit files are not created until we first add data to them
827
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
828
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
829
self.assertFileEqual(
830
"# bzr knit index 8\n"
832
"revid fulltext 0 84 .a_ghost :",
834
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
835
self.assertFileEqual(
836
"# bzr knit index 8\n"
837
"\nrevid fulltext 0 84 .a_ghost :"
838
"\nrevid2 line-delta 84 82 0 :",
840
# we should be able to load this file again
841
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
842
self.assertEqual(['revid', 'revid2'], knit.versions())
843
# write a short write to the file and ensure that its ignored
844
indexfile = file('test.kndx', 'at')
845
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
847
# we should be able to load this file again
848
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
849
self.assertEqual(['revid', 'revid2'], knit.versions())
850
# and add a revision with the same id the failed write had
851
knit.add_lines('revid3', ['revid2'], ['a\n'])
852
# and when reading it revid3 should now appear.
853
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
854
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
855
self.assertEqual(['revid2'], knit.get_parents('revid3'))
857
def test_delay_create(self):
858
"""Test that passing delay_create=True creates files late"""
859
knit = self.make_test_knit(annotate=True, delay_create=True)
860
self.failIfExists('test.knit')
861
self.failIfExists('test.kndx')
862
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
863
self.failUnlessExists('test.knit')
864
self.assertFileEqual(
865
"# bzr knit index 8\n"
867
"revid fulltext 0 84 .a_ghost :",
870
def test_create_parent_dir(self):
871
"""create_parent_dir can create knits in nonexistant dirs"""
872
# Has no effect if we don't set 'delay_create'
873
trans = get_transport('.')
874
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
875
trans, access_mode='w', factory=None,
876
create=True, create_parent_dir=True)
877
# Nothing should have changed yet
878
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
879
factory=None, create=True,
880
create_parent_dir=True,
882
self.failIfExists('dir/test.knit')
883
self.failIfExists('dir/test.kndx')
884
self.failIfExists('dir')
885
knit.add_lines('revid', [], ['a\n'])
886
self.failUnlessExists('dir')
887
self.failUnlessExists('dir/test.knit')
888
self.assertFileEqual(
889
"# bzr knit index 8\n"
891
"revid fulltext 0 84 :",
894
def test_create_mode_700(self):
895
trans = get_transport('.')
896
if not trans._can_roundtrip_unix_modebits():
897
# Can't roundtrip, so no need to run this test
899
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
900
factory=None, create=True,
901
create_parent_dir=True,
905
knit.add_lines('revid', [], ['a\n'])
906
self.assertTransportMode(trans, 'dir', 0700)
907
self.assertTransportMode(trans, 'dir/test.knit', 0600)
908
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
910
def test_create_mode_770(self):
911
trans = get_transport('.')
912
if not trans._can_roundtrip_unix_modebits():
913
# Can't roundtrip, so no need to run this test
915
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
916
factory=None, create=True,
917
create_parent_dir=True,
921
knit.add_lines('revid', [], ['a\n'])
922
self.assertTransportMode(trans, 'dir', 0770)
923
self.assertTransportMode(trans, 'dir/test.knit', 0660)
924
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
926
def test_create_mode_777(self):
927
trans = get_transport('.')
928
if not trans._can_roundtrip_unix_modebits():
929
# Can't roundtrip, so no need to run this test
931
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
932
factory=None, create=True,
933
create_parent_dir=True,
937
knit.add_lines('revid', [], ['a\n'])
938
self.assertTransportMode(trans, 'dir', 0777)
939
self.assertTransportMode(trans, 'dir/test.knit', 0666)
940
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
942
def test_plan_merge(self):
943
my_knit = self.make_test_knit(annotate=True)
944
my_knit.add_lines('text1', [], split_lines(TEXT_1))
945
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
946
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
947
plan = list(my_knit.plan_merge('text1a', 'text1b'))
948
for plan_line, expected_line in zip(plan, AB_MERGE):
949
self.assertEqual(plan_line, expected_line)
961
Banana cup cake recipe
971
Banana cup cake recipe
973
- bananas (do not use plantains!!!)
980
Banana cup cake recipe
996
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1001
new-b|- bananas (do not use plantains!!!)
1002
unchanged|- broken tea cups
1003
new-a|- self-raising flour
1006
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1009
def line_delta(from_lines, to_lines):
1010
"""Generate line-based delta from one text to another"""
1011
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1012
for op in s.get_opcodes():
1013
if op[0] == 'equal':
1015
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1016
for i in range(op[3], op[4]):
1020
def apply_line_delta(basis_lines, delta_lines):
1021
"""Apply a line-based perfect diff
1023
basis_lines -- text to apply the patch to
1024
delta_lines -- diff instructions and content
1026
out = basis_lines[:]
1029
while i < len(delta_lines):
1031
a, b, c = map(long, l.split(','))
1033
out[offset+a:offset+b] = delta_lines[i:i+c]
1035
offset = offset + (b - a) + c
1039
class TestWeaveToKnit(KnitTests):
1041
def test_weave_to_knit_matches(self):
1042
# check that the WeaveToKnit is_compatible function
1043
# registers True for a Weave to a Knit.
1045
k = self.make_test_knit()
1046
self.failUnless(WeaveToKnit.is_compatible(w, k))
1047
self.failIf(WeaveToKnit.is_compatible(k, w))
1048
self.failIf(WeaveToKnit.is_compatible(w, w))
1049
self.failIf(WeaveToKnit.is_compatible(k, k))
1052
class TestKnitCaching(KnitTests):
1054
def create_knit(self, cache_add=False):
1055
k = self.make_test_knit(True)
1059
k.add_lines('text-1', [], split_lines(TEXT_1))
1060
k.add_lines('text-2', [], split_lines(TEXT_2))
1063
def test_no_caching(self):
1064
k = self.create_knit()
1065
# Nothing should be cached without setting 'enable_cache'
1066
self.assertEqual({}, k._data._cache)
1068
def test_cache_add_and_clear(self):
1069
k = self.create_knit(True)
1071
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1074
self.assertEqual({}, k._data._cache)
1076
def test_cache_data_read_raw(self):
1077
k = self.create_knit()
1079
# Now cache and read
1082
def read_one_raw(version):
1083
pos_map = k._get_components_positions([version])
1084
method, pos, size, next = pos_map[version]
1085
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1086
self.assertEqual(1, len(lst))
1089
val = read_one_raw('text-1')
1090
self.assertEqual({'text-1':val[1]}, k._data._cache)
1093
# After clear, new reads are not cached
1094
self.assertEqual({}, k._data._cache)
1096
val2 = read_one_raw('text-1')
1097
self.assertEqual(val, val2)
1098
self.assertEqual({}, k._data._cache)
1100
def test_cache_data_read(self):
1101
k = self.create_knit()
1103
def read_one(version):
1104
pos_map = k._get_components_positions([version])
1105
method, pos, size, next = pos_map[version]
1106
lst = list(k._data.read_records_iter([(version, pos, size)]))
1107
self.assertEqual(1, len(lst))
1110
# Now cache and read
1113
val = read_one('text-2')
1114
self.assertEqual(['text-2'], k._data._cache.keys())
1115
self.assertEqual('text-2', val[0])
1116
content, digest = k._data._parse_record('text-2',
1117
k._data._cache['text-2'])
1118
self.assertEqual(content, val[1])
1119
self.assertEqual(digest, val[2])
1122
self.assertEqual({}, k._data._cache)
1124
val2 = read_one('text-2')
1125
self.assertEqual(val, val2)
1126
self.assertEqual({}, k._data._cache)
1128
def test_cache_read(self):
1129
k = self.create_knit()
1132
text = k.get_text('text-1')
1133
self.assertEqual(TEXT_1, text)
1134
self.assertEqual(['text-1'], k._data._cache.keys())
1137
self.assertEqual({}, k._data._cache)
1139
text = k.get_text('text-1')
1140
self.assertEqual(TEXT_1, text)
1141
self.assertEqual({}, k._data._cache)
1144
class TestKnitIndex(KnitTests):
1146
def test_add_versions_dictionary_compresses(self):
1147
"""Adding versions to the index should update the lookup dict"""
1148
knit = self.make_test_knit()
1150
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1151
self.check_file_contents('test.kndx',
1152
'# bzr knit index 8\n'
1154
'a-1 fulltext 0 0 :'
1156
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1157
('a-3', ['fulltext'], 0, 0, ['a-2']),
1159
self.check_file_contents('test.kndx',
1160
'# bzr knit index 8\n'
1162
'a-1 fulltext 0 0 :\n'
1163
'a-2 fulltext 0 0 0 :\n'
1164
'a-3 fulltext 0 0 1 :'
1166
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1167
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1168
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1169
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1172
def test_add_versions_fails_clean(self):
1173
"""If add_versions fails in the middle, it restores a pristine state.
1175
Any modifications that are made to the index are reset if all versions
1178
# This cheats a little bit by passing in a generator which will
1179
# raise an exception before the processing finishes
1180
# Other possibilities would be to have an version with the wrong number
1181
# of entries, or to make the backing transport unable to write any
1184
knit = self.make_test_knit()
1186
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1188
class StopEarly(Exception):
1191
def generate_failure():
1192
"""Add some entries and then raise an exception"""
1193
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1194
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1197
# Assert the pre-condition
1198
self.assertEqual(['a-1'], idx._history)
1199
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1201
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1203
# And it shouldn't be modified
1204
self.assertEqual(['a-1'], idx._history)
1205
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1207
def test_knit_index_ignores_empty_files(self):
1208
# There was a race condition in older bzr, where a ^C at the right time
1209
# could leave an empty .kndx file, which bzr would later claim was a
1210
# corrupted file since the header was not present. In reality, the file
1211
# just wasn't created, so it should be ignored.
1212
t = get_transport('.')
1213
t.put_bytes('test.kndx', '')
1215
knit = self.make_test_knit()
1217
def test_knit_index_checks_header(self):
1218
t = get_transport('.')
1219
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1221
self.assertRaises(KnitHeaderError, self.make_test_knit)