1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
30
from bzrlib.errors import (
31
RevisionAlreadyPresent,
36
from bzrlib.knit import (
46
from bzrlib.osutils import split_lines
47
from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
48
from bzrlib.transport import TransportLogger, get_transport
49
from bzrlib.transport.memory import MemoryTransport
50
from bzrlib.weave import Weave
53
class _CompiledKnitFeature(Feature):
57
import bzrlib._knit_load_data_c
62
def feature_name(self):
63
return 'bzrlib._knit_load_data_c'
65
CompiledKnitFeature = _CompiledKnitFeature()
68
class KnitContentTests(TestCase):
70
def test_constructor(self):
71
content = KnitContent([])
74
content = KnitContent([])
75
self.assertEqual(content.text(), [])
77
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
78
self.assertEqual(content.text(), ["text1", "text2"])
80
def test_annotate(self):
81
content = KnitContent([])
82
self.assertEqual(content.annotate(), [])
84
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
85
self.assertEqual(content.annotate(),
86
[("origin1", "text1"), ("origin2", "text2")])
88
def test_annotate_iter(self):
89
content = KnitContent([])
90
it = content.annotate_iter()
91
self.assertRaises(StopIteration, it.next)
93
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
94
it = content.annotate_iter()
95
self.assertEqual(it.next(), ("origin1", "text1"))
96
self.assertEqual(it.next(), ("origin2", "text2"))
97
self.assertRaises(StopIteration, it.next)
100
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
101
copy = content.copy()
102
self.assertIsInstance(copy, KnitContent)
103
self.assertEqual(copy.annotate(),
104
[("origin1", "text1"), ("origin2", "text2")])
106
def test_line_delta(self):
107
content1 = KnitContent([("", "a"), ("", "b")])
108
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
109
self.assertEqual(content1.line_delta(content2),
110
[(1, 2, 2, [("", "a"), ("", "c")])])
112
def test_line_delta_iter(self):
113
content1 = KnitContent([("", "a"), ("", "b")])
114
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
115
it = content1.line_delta_iter(content2)
116
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
117
self.assertRaises(StopIteration, it.next)
120
class MockTransport(object):
122
def __init__(self, file_lines=None):
123
self.file_lines = file_lines
125
# We have no base directory for the MockTransport
128
def get(self, filename):
129
if self.file_lines is None:
130
raise NoSuchFile(filename)
132
return StringIO("\n".join(self.file_lines))
134
def readv(self, relpath, offsets):
135
fp = self.get(relpath)
136
for offset, size in offsets:
138
yield offset, fp.read(size)
140
def __getattr__(self, name):
141
def queue_call(*args, **kwargs):
142
self.calls.append((name, args, kwargs))
146
class LowLevelKnitDataTests(TestCase):
148
def create_gz_content(self, text):
150
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
153
return sio.getvalue()
155
def test_valid_knit_data(self):
156
sha1sum = sha.new('foo\nbar\n').hexdigest()
157
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
162
transport = MockTransport([gz_txt])
163
data = _KnitData(transport, 'filename', mode='r')
164
records = [('rev-id-1', 0, len(gz_txt))]
166
contents = data.read_records(records)
167
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
169
raw_contents = list(data.read_records_iter_raw(records))
170
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
172
def test_not_enough_lines(self):
173
sha1sum = sha.new('foo\n').hexdigest()
174
# record says 2 lines data says 1
175
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
179
transport = MockTransport([gz_txt])
180
data = _KnitData(transport, 'filename', mode='r')
181
records = [('rev-id-1', 0, len(gz_txt))]
182
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
184
# read_records_iter_raw won't detect that sort of mismatch/corruption
185
raw_contents = list(data.read_records_iter_raw(records))
186
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
188
def test_too_many_lines(self):
189
sha1sum = sha.new('foo\nbar\n').hexdigest()
190
# record says 1 lines data says 2
191
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
196
transport = MockTransport([gz_txt])
197
data = _KnitData(transport, 'filename', mode='r')
198
records = [('rev-id-1', 0, len(gz_txt))]
199
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
201
# read_records_iter_raw won't detect that sort of mismatch/corruption
202
raw_contents = list(data.read_records_iter_raw(records))
203
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
205
def test_mismatched_version_id(self):
206
sha1sum = sha.new('foo\nbar\n').hexdigest()
207
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
212
transport = MockTransport([gz_txt])
213
data = _KnitData(transport, 'filename', mode='r')
214
# We are asking for rev-id-2, but the data is rev-id-1
215
records = [('rev-id-2', 0, len(gz_txt))]
216
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
218
# read_records_iter_raw will notice if we request the wrong version.
219
self.assertRaises(errors.KnitCorrupt, list,
220
data.read_records_iter_raw(records))
222
def test_uncompressed_data(self):
223
sha1sum = sha.new('foo\nbar\n').hexdigest()
224
txt = ('version rev-id-1 2 %s\n'
229
transport = MockTransport([txt])
230
data = _KnitData(transport, 'filename', mode='r')
231
records = [('rev-id-1', 0, len(txt))]
233
# We don't have valid gzip data ==> corrupt
234
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
236
# read_records_iter_raw will notice the bad data
237
self.assertRaises(errors.KnitCorrupt, list,
238
data.read_records_iter_raw(records))
240
def test_corrupted_data(self):
241
sha1sum = sha.new('foo\nbar\n').hexdigest()
242
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
247
# Change 2 bytes in the middle to \xff
248
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
249
transport = MockTransport([gz_txt])
250
data = _KnitData(transport, 'filename', mode='r')
251
records = [('rev-id-1', 0, len(gz_txt))]
253
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
255
# read_records_iter_raw will notice if we request the wrong version.
256
self.assertRaises(errors.KnitCorrupt, list,
257
data.read_records_iter_raw(records))
260
class LowLevelKnitIndexTests(TestCase):
262
def get_knit_index(self, *args, **kwargs):
263
orig = knit._load_data
265
knit._load_data = orig
266
self.addCleanup(reset)
267
from bzrlib._knit_load_data_py import _load_data_py
268
knit._load_data = _load_data_py
269
return _KnitIndex(*args, **kwargs)
271
def test_no_such_file(self):
272
transport = MockTransport()
274
self.assertRaises(NoSuchFile, self.get_knit_index,
275
transport, "filename", "r")
276
self.assertRaises(NoSuchFile, self.get_knit_index,
277
transport, "filename", "w", create=False)
279
def test_create_file(self):
280
transport = MockTransport()
282
index = self.get_knit_index(transport, "filename", "w",
283
file_mode="wb", create=True)
285
("put_bytes_non_atomic",
286
("filename", index.HEADER), {"mode": "wb"}),
287
transport.calls.pop(0))
289
def test_delay_create_file(self):
290
transport = MockTransport()
292
index = self.get_knit_index(transport, "filename", "w",
293
create=True, file_mode="wb", create_parent_dir=True,
294
delay_create=True, dir_mode=0777)
295
self.assertEqual([], transport.calls)
297
index.add_versions([])
298
name, (filename, f), kwargs = transport.calls.pop(0)
299
self.assertEqual("put_file_non_atomic", name)
301
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
303
self.assertEqual("filename", filename)
304
self.assertEqual(index.HEADER, f.read())
306
index.add_versions([])
307
self.assertEqual(("append_bytes", ("filename", ""), {}),
308
transport.calls.pop(0))
310
def test_read_utf8_version_id(self):
311
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
312
utf8_revision_id = unicode_revision_id.encode('utf-8')
313
transport = MockTransport([
315
'%s option 0 1 :' % (utf8_revision_id,)
317
index = self.get_knit_index(transport, "filename", "r")
318
# _KnitIndex is a private class, and deals in utf8 revision_ids, not
319
# Unicode revision_ids.
320
self.assertTrue(index.has_version(utf8_revision_id))
321
self.assertFalse(index.has_version(unicode_revision_id))
323
def test_read_utf8_parents(self):
324
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
325
utf8_revision_id = unicode_revision_id.encode('utf-8')
326
transport = MockTransport([
328
"version option 0 1 .%s :" % (utf8_revision_id,)
330
index = self.get_knit_index(transport, "filename", "r")
331
self.assertEqual([utf8_revision_id],
332
index.get_parents_with_ghosts("version"))
334
def test_read_ignore_corrupted_lines(self):
335
transport = MockTransport([
338
"corrupted options 0 1 .b .c ",
339
"version options 0 1 :"
341
index = self.get_knit_index(transport, "filename", "r")
342
self.assertEqual(1, index.num_versions())
343
self.assertTrue(index.has_version("version"))
345
def test_read_corrupted_header(self):
346
transport = MockTransport(['not a bzr knit index header\n'])
347
self.assertRaises(KnitHeaderError,
348
self.get_knit_index, transport, "filename", "r")
350
def test_read_duplicate_entries(self):
351
transport = MockTransport([
353
"parent options 0 1 :",
354
"version options1 0 1 0 :",
355
"version options2 1 2 .other :",
356
"version options3 3 4 0 .other :"
358
index = self.get_knit_index(transport, "filename", "r")
359
self.assertEqual(2, index.num_versions())
360
self.assertEqual(1, index.lookup("version"))
361
self.assertEqual((3, 4), index.get_position("version"))
362
self.assertEqual(["options3"], index.get_options("version"))
363
self.assertEqual(["parent", "other"],
364
index.get_parents_with_ghosts("version"))
366
def test_read_compressed_parents(self):
367
transport = MockTransport([
371
"c option 0 1 1 0 :",
373
index = self.get_knit_index(transport, "filename", "r")
374
self.assertEqual(["a"], index.get_parents("b"))
375
self.assertEqual(["b", "a"], index.get_parents("c"))
377
def test_write_utf8_version_id(self):
378
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
379
utf8_revision_id = unicode_revision_id.encode('utf-8')
380
transport = MockTransport([
383
index = self.get_knit_index(transport, "filename", "r")
384
index.add_version(utf8_revision_id, ["option"], 0, 1, [])
385
self.assertEqual(("append_bytes", ("filename",
386
"\n%s option 0 1 :" % (utf8_revision_id,)),
388
transport.calls.pop(0))
390
def test_write_utf8_parents(self):
391
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
392
utf8_revision_id = unicode_revision_id.encode('utf-8')
393
transport = MockTransport([
396
index = self.get_knit_index(transport, "filename", "r")
397
index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
398
self.assertEqual(("append_bytes", ("filename",
399
"\nversion option 0 1 .%s :" % (utf8_revision_id,)),
401
transport.calls.pop(0))
403
def test_get_graph(self):
404
transport = MockTransport()
405
index = self.get_knit_index(transport, "filename", "w", create=True)
406
self.assertEqual([], index.get_graph())
408
index.add_version("a", ["option"], 0, 1, ["b"])
409
self.assertEqual([("a", ["b"])], index.get_graph())
411
index.add_version("c", ["option"], 0, 1, ["d"])
412
self.assertEqual([("a", ["b"]), ("c", ["d"])],
413
sorted(index.get_graph()))
415
def test_get_ancestry(self):
416
transport = MockTransport([
419
"b option 0 1 0 .e :",
420
"c option 0 1 1 0 :",
421
"d option 0 1 2 .f :"
423
index = self.get_knit_index(transport, "filename", "r")
425
self.assertEqual([], index.get_ancestry([]))
426
self.assertEqual(["a"], index.get_ancestry(["a"]))
427
self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
428
self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
429
self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
430
self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
431
self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
433
self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
435
def test_get_ancestry_with_ghosts(self):
436
transport = MockTransport([
439
"b option 0 1 0 .e :",
440
"c option 0 1 0 .f .g :",
441
"d option 0 1 2 .h .j .k :"
443
index = self.get_knit_index(transport, "filename", "r")
445
self.assertEqual([], index.get_ancestry_with_ghosts([]))
446
self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
447
self.assertEqual(["a", "e", "b"],
448
index.get_ancestry_with_ghosts(["b"]))
449
self.assertEqual(["a", "g", "f", "c"],
450
index.get_ancestry_with_ghosts(["c"]))
451
self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
452
index.get_ancestry_with_ghosts(["d"]))
453
self.assertEqual(["a", "e", "b"],
454
index.get_ancestry_with_ghosts(["a", "b"]))
455
self.assertEqual(["a", "g", "f", "c"],
456
index.get_ancestry_with_ghosts(["a", "c"]))
458
["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
459
index.get_ancestry_with_ghosts(["b", "d"]))
461
self.assertRaises(RevisionNotPresent,
462
index.get_ancestry_with_ghosts, ["e"])
464
def test_num_versions(self):
465
transport = MockTransport([
468
index = self.get_knit_index(transport, "filename", "r")
470
self.assertEqual(0, index.num_versions())
471
self.assertEqual(0, len(index))
473
index.add_version("a", ["option"], 0, 1, [])
474
self.assertEqual(1, index.num_versions())
475
self.assertEqual(1, len(index))
477
index.add_version("a", ["option2"], 1, 2, [])
478
self.assertEqual(1, index.num_versions())
479
self.assertEqual(1, len(index))
481
index.add_version("b", ["option"], 0, 1, [])
482
self.assertEqual(2, index.num_versions())
483
self.assertEqual(2, len(index))
485
def test_get_versions(self):
486
transport = MockTransport([
489
index = self.get_knit_index(transport, "filename", "r")
491
self.assertEqual([], index.get_versions())
493
index.add_version("a", ["option"], 0, 1, [])
494
self.assertEqual(["a"], index.get_versions())
496
index.add_version("a", ["option"], 0, 1, [])
497
self.assertEqual(["a"], index.get_versions())
499
index.add_version("b", ["option"], 0, 1, [])
500
self.assertEqual(["a", "b"], index.get_versions())
502
def test_idx_to_name(self):
503
transport = MockTransport([
508
index = self.get_knit_index(transport, "filename", "r")
510
self.assertEqual("a", index.idx_to_name(0))
511
self.assertEqual("b", index.idx_to_name(1))
512
self.assertEqual("b", index.idx_to_name(-1))
513
self.assertEqual("a", index.idx_to_name(-2))
515
def test_lookup(self):
516
transport = MockTransport([
521
index = self.get_knit_index(transport, "filename", "r")
523
self.assertEqual(0, index.lookup("a"))
524
self.assertEqual(1, index.lookup("b"))
526
def test_add_version(self):
527
transport = MockTransport([
530
index = self.get_knit_index(transport, "filename", "r")
532
index.add_version("a", ["option"], 0, 1, ["b"])
533
self.assertEqual(("append_bytes",
534
("filename", "\na option 0 1 .b :"),
535
{}), transport.calls.pop(0))
536
self.assertTrue(index.has_version("a"))
537
self.assertEqual(1, index.num_versions())
538
self.assertEqual((0, 1), index.get_position("a"))
539
self.assertEqual(["option"], index.get_options("a"))
540
self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
542
index.add_version("a", ["opt"], 1, 2, ["c"])
543
self.assertEqual(("append_bytes",
544
("filename", "\na opt 1 2 .c :"),
545
{}), transport.calls.pop(0))
546
self.assertTrue(index.has_version("a"))
547
self.assertEqual(1, index.num_versions())
548
self.assertEqual((1, 2), index.get_position("a"))
549
self.assertEqual(["opt"], index.get_options("a"))
550
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
552
index.add_version("b", ["option"], 2, 3, ["a"])
553
self.assertEqual(("append_bytes",
554
("filename", "\nb option 2 3 0 :"),
555
{}), transport.calls.pop(0))
556
self.assertTrue(index.has_version("b"))
557
self.assertEqual(2, index.num_versions())
558
self.assertEqual((2, 3), index.get_position("b"))
559
self.assertEqual(["option"], index.get_options("b"))
560
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
562
def test_add_versions(self):
563
transport = MockTransport([
566
index = self.get_knit_index(transport, "filename", "r")
569
("a", ["option"], 0, 1, ["b"]),
570
("a", ["opt"], 1, 2, ["c"]),
571
("b", ["option"], 2, 3, ["a"])
573
self.assertEqual(("append_bytes", ("filename",
574
"\na option 0 1 .b :"
577
), {}), transport.calls.pop(0))
578
self.assertTrue(index.has_version("a"))
579
self.assertTrue(index.has_version("b"))
580
self.assertEqual(2, index.num_versions())
581
self.assertEqual((1, 2), index.get_position("a"))
582
self.assertEqual((2, 3), index.get_position("b"))
583
self.assertEqual(["opt"], index.get_options("a"))
584
self.assertEqual(["option"], index.get_options("b"))
585
self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
586
self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
588
def test_delay_create_and_add_versions(self):
589
transport = MockTransport()
591
index = self.get_knit_index(transport, "filename", "w",
592
create=True, file_mode="wb", create_parent_dir=True,
593
delay_create=True, dir_mode=0777)
594
self.assertEqual([], transport.calls)
597
("a", ["option"], 0, 1, ["b"]),
598
("a", ["opt"], 1, 2, ["c"]),
599
("b", ["option"], 2, 3, ["a"])
601
name, (filename, f), kwargs = transport.calls.pop(0)
602
self.assertEqual("put_file_non_atomic", name)
604
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
606
self.assertEqual("filename", filename)
609
"\na option 0 1 .b :"
611
"\nb option 2 3 0 :",
614
def test_has_version(self):
615
transport = MockTransport([
619
index = self.get_knit_index(transport, "filename", "r")
621
self.assertTrue(index.has_version("a"))
622
self.assertFalse(index.has_version("b"))
624
def test_get_position(self):
625
transport = MockTransport([
630
index = self.get_knit_index(transport, "filename", "r")
632
self.assertEqual((0, 1), index.get_position("a"))
633
self.assertEqual((1, 2), index.get_position("b"))
635
def test_get_method(self):
636
transport = MockTransport([
638
"a fulltext,unknown 0 1 :",
639
"b unknown,line-delta 1 2 :",
642
index = self.get_knit_index(transport, "filename", "r")
644
self.assertEqual("fulltext", index.get_method("a"))
645
self.assertEqual("line-delta", index.get_method("b"))
646
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
648
def test_get_options(self):
649
transport = MockTransport([
654
index = self.get_knit_index(transport, "filename", "r")
656
self.assertEqual(["opt1"], index.get_options("a"))
657
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
659
def test_get_parents(self):
660
transport = MockTransport([
663
"b option 1 2 0 .c :",
664
"c option 1 2 1 0 .e :"
666
index = self.get_knit_index(transport, "filename", "r")
668
self.assertEqual([], index.get_parents("a"))
669
self.assertEqual(["a", "c"], index.get_parents("b"))
670
self.assertEqual(["b", "a"], index.get_parents("c"))
672
def test_get_parents_with_ghosts(self):
673
transport = MockTransport([
676
"b option 1 2 0 .c :",
677
"c option 1 2 1 0 .e :"
679
index = self.get_knit_index(transport, "filename", "r")
681
self.assertEqual([], index.get_parents_with_ghosts("a"))
682
self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
683
self.assertEqual(["b", "a", "e"],
684
index.get_parents_with_ghosts("c"))
686
def test_check_versions_present(self):
687
transport = MockTransport([
692
index = self.get_knit_index(transport, "filename", "r")
694
check = index.check_versions_present
700
self.assertRaises(RevisionNotPresent, check, ["c"])
701
self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
703
def test_impossible_parent(self):
704
"""Test we get KnitCorrupt if the parent couldn't possibly exist."""
705
transport = MockTransport([
708
"b option 0 1 4 :" # We don't have a 4th record
711
self.assertRaises(errors.KnitCorrupt,
712
self.get_knit_index, transport, 'filename', 'r')
714
if (str(e) == ('exceptions must be strings, classes, or instances,'
715
' not exceptions.IndexError')
716
and sys.version_info[0:2] >= (2,5)):
717
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
718
' raising new style exceptions with python'
723
def test_corrupted_parent(self):
724
transport = MockTransport([
728
"c option 0 1 1v :", # Can't have a parent of '1v'
731
self.assertRaises(errors.KnitCorrupt,
732
self.get_knit_index, transport, 'filename', 'r')
734
if (str(e) == ('exceptions must be strings, classes, or instances,'
735
' not exceptions.ValueError')
736
and sys.version_info[0:2] >= (2,5)):
737
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
738
' raising new style exceptions with python'
743
def test_corrupted_parent_in_list(self):
744
transport = MockTransport([
748
"c option 0 1 1 v :", # Can't have a parent of 'v'
751
self.assertRaises(errors.KnitCorrupt,
752
self.get_knit_index, transport, 'filename', 'r')
754
if (str(e) == ('exceptions must be strings, classes, or instances,'
755
' not exceptions.ValueError')
756
and sys.version_info[0:2] >= (2,5)):
757
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
758
' raising new style exceptions with python'
763
def test_invalid_position(self):
764
transport = MockTransport([
769
self.assertRaises(errors.KnitCorrupt,
770
self.get_knit_index, transport, 'filename', 'r')
772
if (str(e) == ('exceptions must be strings, classes, or instances,'
773
' not exceptions.ValueError')
774
and sys.version_info[0:2] >= (2,5)):
775
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
776
' raising new style exceptions with python'
781
def test_invalid_size(self):
782
transport = MockTransport([
787
self.assertRaises(errors.KnitCorrupt,
788
self.get_knit_index, transport, 'filename', 'r')
790
if (str(e) == ('exceptions must be strings, classes, or instances,'
791
' not exceptions.ValueError')
792
and sys.version_info[0:2] >= (2,5)):
793
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
794
' raising new style exceptions with python'
799
def test_short_line(self):
800
transport = MockTransport([
803
"b option 10 10 0", # This line isn't terminated, ignored
805
index = self.get_knit_index(transport, "filename", "r")
806
self.assertEqual(['a'], index.get_versions())
808
def test_skip_incomplete_record(self):
809
# A line with bogus data should just be skipped
810
transport = MockTransport([
813
"b option 10 10 0", # This line isn't terminated, ignored
814
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
816
index = self.get_knit_index(transport, "filename", "r")
817
self.assertEqual(['a', 'c'], index.get_versions())
819
def test_trailing_characters(self):
820
# A line with bogus data should just be skipped
821
transport = MockTransport([
824
"b option 10 10 0 :a", # This line has extra trailing characters
825
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
827
index = self.get_knit_index(transport, "filename", "r")
828
self.assertEqual(['a', 'c'], index.get_versions())
831
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
833
_test_needs_features = [CompiledKnitFeature]
835
def get_knit_index(self, *args, **kwargs):
836
orig = knit._load_data
838
knit._load_data = orig
839
self.addCleanup(reset)
840
from bzrlib._knit_load_data_c import _load_data_c
841
knit._load_data = _load_data_c
842
return _KnitIndex(*args, **kwargs)
846
class KnitTests(TestCaseWithTransport):
847
"""Class containing knit test helper routines."""
849
def make_test_knit(self, annotate=False, delay_create=False):
851
factory = KnitPlainFactory()
854
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
855
factory=factory, create=True,
856
delay_create=delay_create)
859
class BasicKnitTests(KnitTests):
861
def add_stock_one_and_one_a(self, k):
862
k.add_lines('text-1', [], split_lines(TEXT_1))
863
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
865
def test_knit_constructor(self):
866
"""Construct empty k"""
867
self.make_test_knit()
869
def test_knit_add(self):
870
"""Store one text in knit and retrieve"""
871
k = self.make_test_knit()
872
k.add_lines('text-1', [], split_lines(TEXT_1))
873
self.assertTrue(k.has_version('text-1'))
874
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
876
def test_knit_reload(self):
877
# test that the content in a reloaded knit is correct
878
k = self.make_test_knit()
879
k.add_lines('text-1', [], split_lines(TEXT_1))
881
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
882
self.assertTrue(k2.has_version('text-1'))
883
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
885
def test_knit_several(self):
886
"""Store several texts in a knit"""
887
k = self.make_test_knit()
888
k.add_lines('text-1', [], split_lines(TEXT_1))
889
k.add_lines('text-2', [], split_lines(TEXT_2))
890
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
891
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
893
def test_repeated_add(self):
894
"""Knit traps attempt to replace existing version"""
895
k = self.make_test_knit()
896
k.add_lines('text-1', [], split_lines(TEXT_1))
897
self.assertRaises(RevisionAlreadyPresent,
899
'text-1', [], split_lines(TEXT_1))
901
def test_empty(self):
902
k = self.make_test_knit(True)
903
k.add_lines('text-1', [], [])
904
self.assertEquals(k.get_lines('text-1'), [])
906
def test_incomplete(self):
907
"""Test if texts without a ending line-end can be inserted and
909
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
910
k.add_lines('text-1', [], ['a\n', 'b' ])
911
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
912
# reopening ensures maximum room for confusion
913
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
914
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
915
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
917
def test_delta(self):
918
"""Expression of knit delta as lines"""
919
k = self.make_test_knit()
921
td = list(line_delta(TEXT_1.splitlines(True),
922
TEXT_1A.splitlines(True)))
923
self.assertEqualDiff(''.join(td), delta_1_1a)
924
out = apply_line_delta(TEXT_1.splitlines(True), td)
925
self.assertEqualDiff(''.join(out), TEXT_1A)
927
def assertDerivedBlocksEqual(self, source, target, noeol=False):
928
"""Assert that the derived matching blocks match real output"""
929
source_lines = source.splitlines(True)
930
target_lines = target.splitlines(True)
932
if noeol and not line.endswith('\n'):
936
source_content = KnitContent([(None, nl(l)) for l in source_lines])
937
target_content = KnitContent([(None, nl(l)) for l in target_lines])
938
line_delta = source_content.line_delta(target_content)
939
delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
940
source_lines, target_lines))
941
matcher = KnitSequenceMatcher(None, source_lines, target_lines)
942
matcher_blocks = list(list(matcher.get_matching_blocks()))
943
self.assertEqual(matcher_blocks, delta_blocks)
945
def test_get_line_delta_blocks(self):
946
self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
947
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
948
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
949
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
950
self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
951
self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
952
self.assertDerivedBlocksEqual(TEXT_1A, '')
953
self.assertDerivedBlocksEqual('', TEXT_1A)
954
self.assertDerivedBlocksEqual('', '')
955
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
957
def test_get_line_delta_blocks_noeol(self):
958
"""Handle historical knit deltas safely
960
Some existing knit deltas don't consider the last line to differ
961
when the only difference whether it has a final newline.
963
New knit deltas appear to always consider the last line to differ
966
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
967
self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
968
self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
969
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
971
def test_add_with_parents(self):
972
"""Store in knit with parents"""
973
k = self.make_test_knit()
974
self.add_stock_one_and_one_a(k)
975
self.assertEquals(k.get_parents('text-1'), [])
976
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
978
def test_ancestry(self):
979
"""Store in knit with parents"""
980
k = self.make_test_knit()
981
self.add_stock_one_and_one_a(k)
982
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
984
def test_add_delta(self):
985
"""Store in knit with parents"""
986
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
987
delta=True, create=True)
988
self.add_stock_one_and_one_a(k)
990
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
992
def test_annotate(self):
994
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
995
delta=True, create=True)
996
self.insert_and_test_small_annotate(k)
998
def insert_and_test_small_annotate(self, k):
999
"""test annotation with k works correctly."""
1000
k.add_lines('text-1', [], ['a\n', 'b\n'])
1001
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
1003
origins = k.annotate('text-2')
1004
self.assertEquals(origins[0], ('text-1', 'a\n'))
1005
self.assertEquals(origins[1], ('text-2', 'c\n'))
1007
def test_annotate_fulltext(self):
1009
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1010
delta=False, create=True)
1011
self.insert_and_test_small_annotate(k)
1013
def test_annotate_merge_1(self):
1014
k = self.make_test_knit(True)
1015
k.add_lines('text-a1', [], ['a\n', 'b\n'])
1016
k.add_lines('text-a2', [], ['d\n', 'c\n'])
1017
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
1018
origins = k.annotate('text-am')
1019
self.assertEquals(origins[0], ('text-a2', 'd\n'))
1020
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1022
def test_annotate_merge_2(self):
1023
k = self.make_test_knit(True)
1024
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1025
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1026
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
1027
origins = k.annotate('text-am')
1028
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1029
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1030
self.assertEquals(origins[2], ('text-a1', 'c\n'))
1032
def test_annotate_merge_9(self):
1033
k = self.make_test_knit(True)
1034
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1035
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1036
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
1037
origins = k.annotate('text-am')
1038
self.assertEquals(origins[0], ('text-am', 'k\n'))
1039
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1040
self.assertEquals(origins[2], ('text-a1', 'c\n'))
1042
def test_annotate_merge_3(self):
1043
k = self.make_test_knit(True)
1044
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1045
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
1046
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
1047
origins = k.annotate('text-am')
1048
self.assertEquals(origins[0], ('text-am', 'k\n'))
1049
self.assertEquals(origins[1], ('text-a2', 'y\n'))
1050
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1052
def test_annotate_merge_4(self):
1053
k = self.make_test_knit(True)
1054
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1055
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
1056
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
1057
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
1058
origins = k.annotate('text-am')
1059
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1060
self.assertEquals(origins[1], ('text-a1', 'b\n'))
1061
self.assertEquals(origins[2], ('text-a2', 'z\n'))
1063
def test_annotate_merge_5(self):
1064
k = self.make_test_knit(True)
1065
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
1066
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
1067
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
1068
k.add_lines('text-am',
1069
['text-a1', 'text-a2', 'text-a3'],
1070
['a\n', 'e\n', 'z\n'])
1071
origins = k.annotate('text-am')
1072
self.assertEquals(origins[0], ('text-a1', 'a\n'))
1073
self.assertEquals(origins[1], ('text-a2', 'e\n'))
1074
self.assertEquals(origins[2], ('text-a3', 'z\n'))
1076
def test_annotate_file_cherry_pick(self):
1077
k = self.make_test_knit(True)
1078
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
1079
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
1080
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
1081
origins = k.annotate('text-3')
1082
self.assertEquals(origins[0], ('text-1', 'a\n'))
1083
self.assertEquals(origins[1], ('text-1', 'b\n'))
1084
self.assertEquals(origins[2], ('text-1', 'c\n'))
1086
def test_knit_join(self):
1087
"""Store in knit with parents"""
1088
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
1089
k1.add_lines('text-a', [], split_lines(TEXT_1))
1090
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
1092
k1.add_lines('text-c', [], split_lines(TEXT_1))
1093
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
1095
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
1097
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
1098
count = k2.join(k1, version_ids=['text-m'])
1099
self.assertEquals(count, 5)
1100
self.assertTrue(k2.has_version('text-a'))
1101
self.assertTrue(k2.has_version('text-c'))
1103
def test_reannotate(self):
1104
k1 = KnitVersionedFile('knit1', get_transport('.'),
1105
factory=KnitAnnotateFactory(), create=True)
1107
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1109
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
1111
k2 = KnitVersionedFile('test2', get_transport('.'),
1112
factory=KnitAnnotateFactory(), create=True)
1113
k2.join(k1, version_ids=['text-b'])
1116
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
1118
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
1120
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
1122
# test-c will have index 3
1123
k1.join(k2, version_ids=['text-c'])
1125
lines = k1.get_lines('text-c')
1126
self.assertEquals(lines, ['z\n', 'c\n'])
1128
origins = k1.annotate('text-c')
1129
self.assertEquals(origins[0], ('text-c', 'z\n'))
1130
self.assertEquals(origins[1], ('text-b', 'c\n'))
1132
def test_get_line_delta_texts(self):
1133
"""Make sure we can call get_texts on text with reused line deltas"""
1134
k1 = KnitVersionedFile('test1', get_transport('.'),
1135
factory=KnitPlainFactory(), create=True)
1140
parents = ['%d' % (t-1)]
1141
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
1142
k1.get_texts(('%d' % t) for t in range(3))
1144
def test_iter_lines_reads_in_order(self):
1145
t = MemoryTransport()
1146
instrumented_t = TransportLogger(t)
1147
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
1148
self.assertEqual([('id.kndx',)], instrumented_t._calls)
1149
# add texts with no required ordering
1150
k1.add_lines('base', [], ['text\n'])
1151
k1.add_lines('base2', [], ['text2\n'])
1153
instrumented_t._calls = []
1154
# request a last-first iteration
1155
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
1156
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
1157
self.assertEqual(['text\n', 'text2\n'], results)
1159
def test_create_empty_annotated(self):
1160
k1 = self.make_test_knit(True)
1162
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1163
k2 = k1.create_empty('t', MemoryTransport())
1164
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
1165
self.assertEqual(k1.delta, k2.delta)
1166
# the generic test checks for empty content and file class
1168
def test_knit_format(self):
1169
# this tests that a new knit index file has the expected content
1170
# and that is writes the data we expect as records are added.
1171
knit = self.make_test_knit(True)
1172
# Now knit files are not created until we first add data to them
1173
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
1174
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1175
self.assertFileEqual(
1176
"# bzr knit index 8\n"
1178
"revid fulltext 0 84 .a_ghost :",
1180
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
1181
self.assertFileEqual(
1182
"# bzr knit index 8\n"
1183
"\nrevid fulltext 0 84 .a_ghost :"
1184
"\nrevid2 line-delta 84 82 0 :",
1186
# we should be able to load this file again
1187
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1188
self.assertEqual(['revid', 'revid2'], knit.versions())
1189
# write a short write to the file and ensure that its ignored
1190
indexfile = file('test.kndx', 'ab')
1191
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
1193
# we should be able to load this file again
1194
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
1195
self.assertEqual(['revid', 'revid2'], knit.versions())
1196
# and add a revision with the same id the failed write had
1197
knit.add_lines('revid3', ['revid2'], ['a\n'])
1198
# and when reading it revid3 should now appear.
1199
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1200
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
1201
self.assertEqual(['revid2'], knit.get_parents('revid3'))
1203
def test_delay_create(self):
1204
"""Test that passing delay_create=True creates files late"""
1205
knit = self.make_test_knit(annotate=True, delay_create=True)
1206
self.failIfExists('test.knit')
1207
self.failIfExists('test.kndx')
1208
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
1209
self.failUnlessExists('test.knit')
1210
self.assertFileEqual(
1211
"# bzr knit index 8\n"
1213
"revid fulltext 0 84 .a_ghost :",
1216
def test_create_parent_dir(self):
1217
"""create_parent_dir can create knits in nonexistant dirs"""
1218
# Has no effect if we don't set 'delay_create'
1219
trans = get_transport('.')
1220
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
1221
trans, access_mode='w', factory=None,
1222
create=True, create_parent_dir=True)
1223
# Nothing should have changed yet
1224
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1225
factory=None, create=True,
1226
create_parent_dir=True,
1228
self.failIfExists('dir/test.knit')
1229
self.failIfExists('dir/test.kndx')
1230
self.failIfExists('dir')
1231
knit.add_lines('revid', [], ['a\n'])
1232
self.failUnlessExists('dir')
1233
self.failUnlessExists('dir/test.knit')
1234
self.assertFileEqual(
1235
"# bzr knit index 8\n"
1237
"revid fulltext 0 84 :",
1240
def test_create_mode_700(self):
1241
trans = get_transport('.')
1242
if not trans._can_roundtrip_unix_modebits():
1243
# Can't roundtrip, so no need to run this test
1245
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1246
factory=None, create=True,
1247
create_parent_dir=True,
1251
knit.add_lines('revid', [], ['a\n'])
1252
self.assertTransportMode(trans, 'dir', 0700)
1253
self.assertTransportMode(trans, 'dir/test.knit', 0600)
1254
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
1256
def test_create_mode_770(self):
1257
trans = get_transport('.')
1258
if not trans._can_roundtrip_unix_modebits():
1259
# Can't roundtrip, so no need to run this test
1261
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1262
factory=None, create=True,
1263
create_parent_dir=True,
1267
knit.add_lines('revid', [], ['a\n'])
1268
self.assertTransportMode(trans, 'dir', 0770)
1269
self.assertTransportMode(trans, 'dir/test.knit', 0660)
1270
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
1272
def test_create_mode_777(self):
1273
trans = get_transport('.')
1274
if not trans._can_roundtrip_unix_modebits():
1275
# Can't roundtrip, so no need to run this test
1277
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1278
factory=None, create=True,
1279
create_parent_dir=True,
1283
knit.add_lines('revid', [], ['a\n'])
1284
self.assertTransportMode(trans, 'dir', 0777)
1285
self.assertTransportMode(trans, 'dir/test.knit', 0666)
1286
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
1288
def test_plan_merge(self):
1289
my_knit = self.make_test_knit(annotate=True)
1290
my_knit.add_lines('text1', [], split_lines(TEXT_1))
1291
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
1292
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
1293
plan = list(my_knit.plan_merge('text1a', 'text1b'))
1294
for plan_line, expected_line in zip(plan, AB_MERGE):
1295
self.assertEqual(plan_line, expected_line)
1307
Banana cup cake recipe
1313
- self-raising flour
1317
Banana cup cake recipe
1319
- bananas (do not use plantains!!!)
1326
Banana cup cake recipe
1329
- self-raising flour
1342
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
1347
new-b|- bananas (do not use plantains!!!)
1348
unchanged|- broken tea cups
1349
new-a|- self-raising flour
1352
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
1355
def line_delta(from_lines, to_lines):
1356
"""Generate line-based delta from one text to another"""
1357
s = difflib.SequenceMatcher(None, from_lines, to_lines)
1358
for op in s.get_opcodes():
1359
if op[0] == 'equal':
1361
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
1362
for i in range(op[3], op[4]):
1366
def apply_line_delta(basis_lines, delta_lines):
1367
"""Apply a line-based perfect diff
1369
basis_lines -- text to apply the patch to
1370
delta_lines -- diff instructions and content
1372
out = basis_lines[:]
1375
while i < len(delta_lines):
1377
a, b, c = map(long, l.split(','))
1379
out[offset+a:offset+b] = delta_lines[i:i+c]
1381
offset = offset + (b - a) + c
1385
class TestWeaveToKnit(KnitTests):
1387
def test_weave_to_knit_matches(self):
1388
# check that the WeaveToKnit is_compatible function
1389
# registers True for a Weave to a Knit.
1391
k = self.make_test_knit()
1392
self.failUnless(WeaveToKnit.is_compatible(w, k))
1393
self.failIf(WeaveToKnit.is_compatible(k, w))
1394
self.failIf(WeaveToKnit.is_compatible(w, w))
1395
self.failIf(WeaveToKnit.is_compatible(k, k))
1398
class TestKnitCaching(KnitTests):
1400
def create_knit(self, cache_add=False):
1401
k = self.make_test_knit(True)
1405
k.add_lines('text-1', [], split_lines(TEXT_1))
1406
k.add_lines('text-2', [], split_lines(TEXT_2))
1409
def test_no_caching(self):
1410
k = self.create_knit()
1411
# Nothing should be cached without setting 'enable_cache'
1412
self.assertEqual({}, k._data._cache)
1414
def test_cache_add_and_clear(self):
1415
k = self.create_knit(True)
1417
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1420
self.assertEqual({}, k._data._cache)
1422
def test_cache_data_read_raw(self):
1423
k = self.create_knit()
1425
# Now cache and read
1428
def read_one_raw(version):
1429
pos_map = k._get_components_positions([version])
1430
method, pos, size, next = pos_map[version]
1431
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
1432
self.assertEqual(1, len(lst))
1435
val = read_one_raw('text-1')
1436
self.assertEqual({'text-1':val[1]}, k._data._cache)
1439
# After clear, new reads are not cached
1440
self.assertEqual({}, k._data._cache)
1442
val2 = read_one_raw('text-1')
1443
self.assertEqual(val, val2)
1444
self.assertEqual({}, k._data._cache)
1446
def test_cache_data_read(self):
1447
k = self.create_knit()
1449
def read_one(version):
1450
pos_map = k._get_components_positions([version])
1451
method, pos, size, next = pos_map[version]
1452
lst = list(k._data.read_records_iter([(version, pos, size)]))
1453
self.assertEqual(1, len(lst))
1456
# Now cache and read
1459
val = read_one('text-2')
1460
self.assertEqual(['text-2'], k._data._cache.keys())
1461
self.assertEqual('text-2', val[0])
1462
content, digest = k._data._parse_record('text-2',
1463
k._data._cache['text-2'])
1464
self.assertEqual(content, val[1])
1465
self.assertEqual(digest, val[2])
1468
self.assertEqual({}, k._data._cache)
1470
val2 = read_one('text-2')
1471
self.assertEqual(val, val2)
1472
self.assertEqual({}, k._data._cache)
1474
def test_cache_read(self):
1475
k = self.create_knit()
1478
text = k.get_text('text-1')
1479
self.assertEqual(TEXT_1, text)
1480
self.assertEqual(['text-1'], k._data._cache.keys())
1483
self.assertEqual({}, k._data._cache)
1485
text = k.get_text('text-1')
1486
self.assertEqual(TEXT_1, text)
1487
self.assertEqual({}, k._data._cache)
1490
class TestKnitIndex(KnitTests):
1492
def test_add_versions_dictionary_compresses(self):
1493
"""Adding versions to the index should update the lookup dict"""
1494
knit = self.make_test_knit()
1496
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1497
self.check_file_contents('test.kndx',
1498
'# bzr knit index 8\n'
1500
'a-1 fulltext 0 0 :'
1502
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
1503
('a-3', ['fulltext'], 0, 0, ['a-2']),
1505
self.check_file_contents('test.kndx',
1506
'# bzr knit index 8\n'
1508
'a-1 fulltext 0 0 :\n'
1509
'a-2 fulltext 0 0 0 :\n'
1510
'a-3 fulltext 0 0 1 :'
1512
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
1513
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
1514
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
1515
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
1518
def test_add_versions_fails_clean(self):
1519
"""If add_versions fails in the middle, it restores a pristine state.
1521
Any modifications that are made to the index are reset if all versions
1524
# This cheats a little bit by passing in a generator which will
1525
# raise an exception before the processing finishes
1526
# Other possibilities would be to have an version with the wrong number
1527
# of entries, or to make the backing transport unable to write any
1530
knit = self.make_test_knit()
1532
idx.add_version('a-1', ['fulltext'], 0, 0, [])
1534
class StopEarly(Exception):
1537
def generate_failure():
1538
"""Add some entries and then raise an exception"""
1539
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
1540
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
1543
# Assert the pre-condition
1544
self.assertEqual(['a-1'], idx._history)
1545
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1547
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
1549
# And it shouldn't be modified
1550
self.assertEqual(['a-1'], idx._history)
1551
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
1553
def test_knit_index_ignores_empty_files(self):
1554
# There was a race condition in older bzr, where a ^C at the right time
1555
# could leave an empty .kndx file, which bzr would later claim was a
1556
# corrupted file since the header was not present. In reality, the file
1557
# just wasn't created, so it should be ignored.
1558
t = get_transport('.')
1559
t.put_bytes('test.kndx', '')
1561
knit = self.make_test_knit()
1563
def test_knit_index_checks_header(self):
1564
t = get_transport('.')
1565
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1567
self.assertRaises(KnitHeaderError, self.make_test_knit)