1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
19
from cStringIO import StringIO
31
from bzrlib.errors import (
32
RevisionAlreadyPresent,
37
from bzrlib.index import *
38
from bzrlib.knit import (
50
from bzrlib.osutils import split_lines
51
from bzrlib.symbol_versioning import one_four
52
from bzrlib.tests import (
55
TestCaseWithMemoryTransport,
56
TestCaseWithTransport,
58
from bzrlib.transport import get_transport
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.tuned_gzip import GzipFile
61
from bzrlib.versionedfile import ConstantMapper
64
class _CompiledKnitFeature(Feature):
68
import bzrlib._knit_load_data_c
73
def feature_name(self):
74
return 'bzrlib._knit_load_data_c'
76
CompiledKnitFeature = _CompiledKnitFeature()
79
class KnitContentTestsMixin(object):
81
def test_constructor(self):
82
content = self._make_content([])
85
content = self._make_content([])
86
self.assertEqual(content.text(), [])
88
content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
89
self.assertEqual(content.text(), ["text1", "text2"])
92
content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
94
self.assertIsInstance(copy, content.__class__)
95
self.assertEqual(copy.annotate(), content.annotate())
97
def assertDerivedBlocksEqual(self, source, target, noeol=False):
98
"""Assert that the derived matching blocks match real output"""
99
source_lines = source.splitlines(True)
100
target_lines = target.splitlines(True)
102
if noeol and not line.endswith('\n'):
106
source_content = self._make_content([(None, nl(l)) for l in source_lines])
107
target_content = self._make_content([(None, nl(l)) for l in target_lines])
108
line_delta = source_content.line_delta(target_content)
109
delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
110
source_lines, target_lines))
111
matcher = KnitSequenceMatcher(None, source_lines, target_lines)
112
matcher_blocks = list(list(matcher.get_matching_blocks()))
113
self.assertEqual(matcher_blocks, delta_blocks)
115
def test_get_line_delta_blocks(self):
116
self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
117
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
118
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
119
self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
120
self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
121
self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
122
self.assertDerivedBlocksEqual(TEXT_1A, '')
123
self.assertDerivedBlocksEqual('', TEXT_1A)
124
self.assertDerivedBlocksEqual('', '')
125
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
127
def test_get_line_delta_blocks_noeol(self):
128
"""Handle historical knit deltas safely
130
Some existing knit deltas don't consider the last line to differ
131
when the only difference whether it has a final newline.
133
New knit deltas appear to always consider the last line to differ
136
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
137
self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
138
self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
139
self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
151
Banana cup cake recipe
161
Banana cup cake recipe
163
- bananas (do not use plantains!!!)
170
Banana cup cake recipe
187
class TestPlainKnitContent(TestCase, KnitContentTestsMixin):
189
def _make_content(self, lines):
190
annotated_content = AnnotatedKnitContent(lines)
191
return PlainKnitContent(annotated_content.text(), 'bogus')
193
def test_annotate(self):
194
content = self._make_content([])
195
self.assertEqual(content.annotate(), [])
197
content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
198
self.assertEqual(content.annotate(),
199
[("bogus", "text1"), ("bogus", "text2")])
201
def test_line_delta(self):
202
content1 = self._make_content([("", "a"), ("", "b")])
203
content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
204
self.assertEqual(content1.line_delta(content2),
205
[(1, 2, 2, ["a", "c"])])
207
def test_line_delta_iter(self):
208
content1 = self._make_content([("", "a"), ("", "b")])
209
content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
210
it = content1.line_delta_iter(content2)
211
self.assertEqual(it.next(), (1, 2, 2, ["a", "c"]))
212
self.assertRaises(StopIteration, it.next)
215
class TestAnnotatedKnitContent(TestCase, KnitContentTestsMixin):
217
def _make_content(self, lines):
218
return AnnotatedKnitContent(lines)
220
def test_annotate(self):
221
content = self._make_content([])
222
self.assertEqual(content.annotate(), [])
224
content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
225
self.assertEqual(content.annotate(),
226
[("origin1", "text1"), ("origin2", "text2")])
228
def test_line_delta(self):
229
content1 = self._make_content([("", "a"), ("", "b")])
230
content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
231
self.assertEqual(content1.line_delta(content2),
232
[(1, 2, 2, [("", "a"), ("", "c")])])
234
def test_line_delta_iter(self):
235
content1 = self._make_content([("", "a"), ("", "b")])
236
content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
237
it = content1.line_delta_iter(content2)
238
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
239
self.assertRaises(StopIteration, it.next)
242
class MockTransport(object):
244
def __init__(self, file_lines=None):
245
self.file_lines = file_lines
247
# We have no base directory for the MockTransport
250
def get(self, filename):
251
if self.file_lines is None:
252
raise NoSuchFile(filename)
254
return StringIO("\n".join(self.file_lines))
256
def readv(self, relpath, offsets):
257
fp = self.get(relpath)
258
for offset, size in offsets:
260
yield offset, fp.read(size)
262
def __getattr__(self, name):
263
def queue_call(*args, **kwargs):
264
self.calls.append((name, args, kwargs))
268
class KnitRecordAccessTestsMixin(object):
269
"""Tests for getting and putting knit records."""
271
def test_add_raw_records(self):
272
"""Add_raw_records adds records retrievable later."""
273
access = self.get_access()
274
memos = access.add_raw_records([('key', 10)], '1234567890')
275
self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
277
def test_add_several_raw_records(self):
278
"""add_raw_records with many records and read some back."""
279
access = self.get_access()
280
memos = access.add_raw_records([('key', 10), ('key2', 2), ('key3', 5)],
282
self.assertEqual(['1234567890', '12', '34567'],
283
list(access.get_raw_records(memos)))
284
self.assertEqual(['1234567890'],
285
list(access.get_raw_records(memos[0:1])))
286
self.assertEqual(['12'],
287
list(access.get_raw_records(memos[1:2])))
288
self.assertEqual(['34567'],
289
list(access.get_raw_records(memos[2:3])))
290
self.assertEqual(['1234567890', '34567'],
291
list(access.get_raw_records(memos[0:1] + memos[2:3])))
294
class TestKnitKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
295
"""Tests for the .kndx implementation."""
297
def get_access(self):
298
"""Get a .knit style access instance."""
299
mapper = ConstantMapper("foo")
300
access = _KnitKeyAccess(self.get_transport(), mapper)
304
class TestPackKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
305
"""Tests for the pack based access."""
307
def get_access(self):
308
return self._get_access()[0]
310
def _get_access(self, packname='packfile', index='FOO'):
311
transport = self.get_transport()
312
def write_data(bytes):
313
transport.append_bytes(packname, bytes)
314
writer = pack.ContainerWriter(write_data)
316
access = _DirectPackAccess({})
317
access.set_writer(writer, index, (transport, packname))
318
return access, writer
320
def test_read_from_several_packs(self):
321
access, writer = self._get_access()
323
memos.extend(access.add_raw_records([('key', 10)], '1234567890'))
325
access, writer = self._get_access('pack2', 'FOOBAR')
326
memos.extend(access.add_raw_records([('key', 5)], '12345'))
328
access, writer = self._get_access('pack3', 'BAZ')
329
memos.extend(access.add_raw_records([('key', 5)], 'alpha'))
331
transport = self.get_transport()
332
access = _DirectPackAccess({"FOO":(transport, 'packfile'),
333
"FOOBAR":(transport, 'pack2'),
334
"BAZ":(transport, 'pack3')})
335
self.assertEqual(['1234567890', '12345', 'alpha'],
336
list(access.get_raw_records(memos)))
337
self.assertEqual(['1234567890'],
338
list(access.get_raw_records(memos[0:1])))
339
self.assertEqual(['12345'],
340
list(access.get_raw_records(memos[1:2])))
341
self.assertEqual(['alpha'],
342
list(access.get_raw_records(memos[2:3])))
343
self.assertEqual(['1234567890', 'alpha'],
344
list(access.get_raw_records(memos[0:1] + memos[2:3])))
346
def test_set_writer(self):
347
"""The writer should be settable post construction."""
348
access = _DirectPackAccess({})
349
transport = self.get_transport()
350
packname = 'packfile'
352
def write_data(bytes):
353
transport.append_bytes(packname, bytes)
354
writer = pack.ContainerWriter(write_data)
356
access.set_writer(writer, index, (transport, packname))
357
memos = access.add_raw_records([('key', 10)], '1234567890')
359
self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
362
class LowLevelKnitDataTests(TestCase):
364
def create_gz_content(self, text):
366
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
369
return sio.getvalue()
371
def test_valid_knit_data(self):
372
sha1sum = sha.new('foo\nbar\n').hexdigest()
373
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
378
transport = MockTransport([gz_txt])
379
access = _KnitKeyAccess(transport, ConstantMapper('filename'))
380
knit = KnitVersionedFiles(None, access)
381
records = [(('rev-id-1',), (('rev-id-1',), 0, len(gz_txt)))]
383
contents = list(knit._read_records_iter(records))
384
self.assertEqual([(('rev-id-1',), ['foo\n', 'bar\n'],
385
'4e48e2c9a3d2ca8a708cb0cc545700544efb5021')], contents)
387
raw_contents = list(knit._read_records_iter_raw(records))
388
self.assertEqual([(('rev-id-1',), gz_txt, sha1sum)], raw_contents)
390
def test_not_enough_lines(self):
391
sha1sum = sha.new('foo\n').hexdigest()
392
# record says 2 lines data says 1
393
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
397
transport = MockTransport([gz_txt])
398
access = _KnitKeyAccess(transport, ConstantMapper('filename'))
399
knit = KnitVersionedFiles(None, access)
400
records = [(('rev-id-1',), (('rev-id-1',), 0, len(gz_txt)))]
401
self.assertRaises(errors.KnitCorrupt, list,
402
knit._read_records_iter(records))
404
# read_records_iter_raw won't detect that sort of mismatch/corruption
405
raw_contents = list(knit._read_records_iter_raw(records))
406
self.assertEqual([(('rev-id-1',), gz_txt, sha1sum)], raw_contents)
408
def test_too_many_lines(self):
409
sha1sum = sha.new('foo\nbar\n').hexdigest()
410
# record says 1 lines data says 2
411
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
416
transport = MockTransport([gz_txt])
417
access = _KnitKeyAccess(transport, ConstantMapper('filename'))
418
knit = KnitVersionedFiles(None, access)
419
records = [(('rev-id-1',), (('rev-id-1',), 0, len(gz_txt)))]
420
self.assertRaises(errors.KnitCorrupt, list,
421
knit._read_records_iter(records))
423
# read_records_iter_raw won't detect that sort of mismatch/corruption
424
raw_contents = list(knit._read_records_iter_raw(records))
425
self.assertEqual([(('rev-id-1',), gz_txt, sha1sum)], raw_contents)
427
def test_mismatched_version_id(self):
428
sha1sum = sha.new('foo\nbar\n').hexdigest()
429
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
434
transport = MockTransport([gz_txt])
435
access = _KnitKeyAccess(transport, ConstantMapper('filename'))
436
knit = KnitVersionedFiles(None, access)
437
# We are asking for rev-id-2, but the data is rev-id-1
438
records = [(('rev-id-2',), (('rev-id-2',), 0, len(gz_txt)))]
439
self.assertRaises(errors.KnitCorrupt, list,
440
knit._read_records_iter(records))
442
# read_records_iter_raw detects mismatches in the header
443
self.assertRaises(errors.KnitCorrupt, list,
444
knit._read_records_iter_raw(records))
446
def test_uncompressed_data(self):
447
sha1sum = sha.new('foo\nbar\n').hexdigest()
448
txt = ('version rev-id-1 2 %s\n'
453
transport = MockTransport([txt])
454
access = _KnitKeyAccess(transport, ConstantMapper('filename'))
455
knit = KnitVersionedFiles(None, access)
456
records = [(('rev-id-1',), (('rev-id-1',), 0, len(txt)))]
458
# We don't have valid gzip data ==> corrupt
459
self.assertRaises(errors.KnitCorrupt, list,
460
knit._read_records_iter(records))
462
# read_records_iter_raw will notice the bad data
463
self.assertRaises(errors.KnitCorrupt, list,
464
knit._read_records_iter_raw(records))
466
def test_corrupted_data(self):
467
sha1sum = sha.new('foo\nbar\n').hexdigest()
468
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
473
# Change 2 bytes in the middle to \xff
474
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
475
transport = MockTransport([gz_txt])
476
access = _KnitKeyAccess(transport, ConstantMapper('filename'))
477
knit = KnitVersionedFiles(None, access)
478
records = [(('rev-id-1',), (('rev-id-1',), 0, len(gz_txt)))]
479
self.assertRaises(errors.KnitCorrupt, list,
480
knit._read_records_iter(records))
481
# read_records_iter_raw will barf on bad gz data
482
self.assertRaises(errors.KnitCorrupt, list,
483
knit._read_records_iter_raw(records))
486
class LowLevelKnitIndexTests(TestCase):
488
def get_knit_index(self, transport, name, mode):
489
mapper = ConstantMapper(name)
490
orig = knit._load_data
492
knit._load_data = orig
493
self.addCleanup(reset)
494
from bzrlib._knit_load_data_py import _load_data_py
495
knit._load_data = _load_data_py
496
allow_writes = lambda: 'w' in mode
497
return _KndxIndex(transport, mapper, lambda:None, allow_writes, lambda:True)
499
def test_create_file(self):
500
transport = MockTransport()
501
index = self.get_knit_index(transport, "filename", "w")
503
call = transport.calls.pop(0)
504
# call[1][1] is a StringIO - we can't test it by simple equality.
505
self.assertEqual('put_file_non_atomic', call[0])
506
self.assertEqual('filename.kndx', call[1][0])
507
# With no history, _KndxIndex writes a new index:
508
self.assertEqual(_KndxIndex.HEADER,
509
call[1][1].getvalue())
510
self.assertEqual({'create_parent_dir': True}, call[2])
512
def test_read_utf8_version_id(self):
513
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
514
utf8_revision_id = unicode_revision_id.encode('utf-8')
515
transport = MockTransport([
517
'%s option 0 1 :' % (utf8_revision_id,)
519
index = self.get_knit_index(transport, "filename", "r")
520
# _KndxIndex is a private class, and deals in utf8 revision_ids, not
521
# Unicode revision_ids.
522
self.assertEqual({(utf8_revision_id,):()},
523
index.get_parent_map(index.keys()))
524
self.assertFalse((unicode_revision_id,) in index.keys())
526
def test_read_utf8_parents(self):
527
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
528
utf8_revision_id = unicode_revision_id.encode('utf-8')
529
transport = MockTransport([
531
"version option 0 1 .%s :" % (utf8_revision_id,)
533
index = self.get_knit_index(transport, "filename", "r")
534
self.assertEqual({("version",):((utf8_revision_id,),)},
535
index.get_parent_map(index.keys()))
537
def test_read_ignore_corrupted_lines(self):
538
transport = MockTransport([
541
"corrupted options 0 1 .b .c ",
542
"version options 0 1 :"
544
index = self.get_knit_index(transport, "filename", "r")
545
self.assertEqual(1, len(index.keys()))
546
self.assertEqual(set([("version",)]), index.keys())
548
def test_read_corrupted_header(self):
549
transport = MockTransport(['not a bzr knit index header\n'])
550
index = self.get_knit_index(transport, "filename", "r")
551
self.assertRaises(KnitHeaderError, index.keys)
553
def test_read_duplicate_entries(self):
554
transport = MockTransport([
556
"parent options 0 1 :",
557
"version options1 0 1 0 :",
558
"version options2 1 2 .other :",
559
"version options3 3 4 0 .other :"
561
index = self.get_knit_index(transport, "filename", "r")
562
self.assertEqual(2, len(index.keys()))
563
# check that the index used is the first one written. (Specific
564
# to KnitIndex style indices.
565
self.assertEqual("1", index._dictionary_compress([("version",)]))
566
self.assertEqual((("version",), 3, 4), index.get_position(("version",)))
567
self.assertEqual(["options3"], index.get_options(("version",)))
568
self.assertEqual({("version",):(("parent",), ("other",))},
569
index.get_parent_map([("version",)]))
571
def test_read_compressed_parents(self):
572
transport = MockTransport([
576
"c option 0 1 1 0 :",
578
index = self.get_knit_index(transport, "filename", "r")
579
self.assertEqual({("b",):(("a",),), ("c",):(("b",), ("a",))},
580
index.get_parent_map([("b",), ("c",)]))
582
def test_write_utf8_version_id(self):
583
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
584
utf8_revision_id = unicode_revision_id.encode('utf-8')
585
transport = MockTransport([
588
index = self.get_knit_index(transport, "filename", "r")
590
((utf8_revision_id,), ["option"], ((utf8_revision_id,), 0, 1), [])])
591
call = transport.calls.pop(0)
592
# call[1][1] is a StringIO - we can't test it by simple equality.
593
self.assertEqual('put_file_non_atomic', call[0])
594
self.assertEqual('filename.kndx', call[1][0])
595
# With no history, _KndxIndex writes a new index:
596
self.assertEqual(_KndxIndex.HEADER +
597
"\n%s option 0 1 :" % (utf8_revision_id,),
598
call[1][1].getvalue())
599
self.assertEqual({'create_parent_dir': True}, call[2])
601
def test_write_utf8_parents(self):
602
unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
603
utf8_revision_id = unicode_revision_id.encode('utf-8')
604
transport = MockTransport([
607
index = self.get_knit_index(transport, "filename", "r")
609
(("version",), ["option"], (("version",), 0, 1), [(utf8_revision_id,)])])
610
call = transport.calls.pop(0)
611
# call[1][1] is a StringIO - we can't test it by simple equality.
612
self.assertEqual('put_file_non_atomic', call[0])
613
self.assertEqual('filename.kndx', call[1][0])
614
# With no history, _KndxIndex writes a new index:
615
self.assertEqual(_KndxIndex.HEADER +
616
"\nversion option 0 1 .%s :" % (utf8_revision_id,),
617
call[1][1].getvalue())
618
self.assertEqual({'create_parent_dir': True}, call[2])
621
transport = MockTransport([
624
index = self.get_knit_index(transport, "filename", "r")
626
self.assertEqual(set(), index.keys())
628
index.add_records([(("a",), ["option"], (("a",), 0, 1), [])])
629
self.assertEqual(set([("a",)]), index.keys())
631
index.add_records([(("a",), ["option"], (("a",), 0, 1), [])])
632
self.assertEqual(set([("a",)]), index.keys())
634
index.add_records([(("b",), ["option"], (("b",), 0, 1), [])])
635
self.assertEqual(set([("a",), ("b",)]), index.keys())
637
def add_a_b(self, index, random_id=None):
639
if random_id is not None:
640
kwargs["random_id"] = random_id
642
(("a",), ["option"], (("a",), 0, 1), [("b",)]),
643
(("a",), ["opt"], (("a",), 1, 2), [("c",)]),
644
(("b",), ["option"], (("b",), 2, 3), [("a",)])
647
def assertIndexIsAB(self, index):
652
index.get_parent_map(index.keys()))
653
self.assertEqual((("a",), 1, 2), index.get_position(("a",)))
654
self.assertEqual((("b",), 2, 3), index.get_position(("b",)))
655
self.assertEqual(["opt"], index.get_options(("a",)))
657
def test_add_versions(self):
658
transport = MockTransport([
661
index = self.get_knit_index(transport, "filename", "r")
664
call = transport.calls.pop(0)
665
# call[1][1] is a StringIO - we can't test it by simple equality.
666
self.assertEqual('put_file_non_atomic', call[0])
667
self.assertEqual('filename.kndx', call[1][0])
668
# With no history, _KndxIndex writes a new index:
671
"\na option 0 1 .b :"
673
"\nb option 2 3 0 :",
674
call[1][1].getvalue())
675
self.assertEqual({'create_parent_dir': True}, call[2])
676
self.assertIndexIsAB(index)
678
def test_add_versions_random_id_is_accepted(self):
679
transport = MockTransport([
682
index = self.get_knit_index(transport, "filename", "r")
683
self.add_a_b(index, random_id=True)
685
def test_delay_create_and_add_versions(self):
686
transport = MockTransport()
688
index = self.get_knit_index(transport, "filename", "w")
690
self.assertEqual([], transport.calls)
693
#[ {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
695
# Two calls: one during which we load the existing index (and when its
696
# missing create it), then a second where we write the contents out.
697
self.assertEqual(2, len(transport.calls))
698
call = transport.calls.pop(0)
699
self.assertEqual('put_file_non_atomic', call[0])
700
self.assertEqual('filename.kndx', call[1][0])
701
# With no history, _KndxIndex writes a new index:
702
self.assertEqual(_KndxIndex.HEADER, call[1][1].getvalue())
703
self.assertEqual({'create_parent_dir': True}, call[2])
704
call = transport.calls.pop(0)
705
# call[1][1] is a StringIO - we can't test it by simple equality.
706
self.assertEqual('put_file_non_atomic', call[0])
707
self.assertEqual('filename.kndx', call[1][0])
708
# With no history, _KndxIndex writes a new index:
711
"\na option 0 1 .b :"
713
"\nb option 2 3 0 :",
714
call[1][1].getvalue())
715
self.assertEqual({'create_parent_dir': True}, call[2])
717
def test_get_position(self):
718
transport = MockTransport([
723
index = self.get_knit_index(transport, "filename", "r")
725
self.assertEqual((("a",), 0, 1), index.get_position(("a",)))
726
self.assertEqual((("b",), 1, 2), index.get_position(("b",)))
728
def test_get_method(self):
729
transport = MockTransport([
731
"a fulltext,unknown 0 1 :",
732
"b unknown,line-delta 1 2 :",
735
index = self.get_knit_index(transport, "filename", "r")
737
self.assertEqual("fulltext", index.get_method("a"))
738
self.assertEqual("line-delta", index.get_method("b"))
739
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
741
def test_get_options(self):
742
transport = MockTransport([
747
index = self.get_knit_index(transport, "filename", "r")
749
self.assertEqual(["opt1"], index.get_options("a"))
750
self.assertEqual(["opt2", "opt3"], index.get_options("b"))
752
def test_get_parent_map(self):
753
transport = MockTransport([
756
"b option 1 2 0 .c :",
757
"c option 1 2 1 0 .e :"
759
index = self.get_knit_index(transport, "filename", "r")
763
("b",):(("a",), ("c",)),
764
("c",):(("b",), ("a",), ("e",)),
765
}, index.get_parent_map(index.keys()))
767
def test_impossible_parent(self):
768
"""Test we get KnitCorrupt if the parent couldn't possibly exist."""
769
transport = MockTransport([
772
"b option 0 1 4 :" # We don't have a 4th record
774
index = self.get_knit_index(transport, 'filename', 'r')
776
self.assertRaises(errors.KnitCorrupt, index.keys)
778
if (str(e) == ('exceptions must be strings, classes, or instances,'
779
' not exceptions.IndexError')
780
and sys.version_info[0:2] >= (2,5)):
781
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
782
' raising new style exceptions with python'
787
def test_corrupted_parent(self):
788
transport = MockTransport([
792
"c option 0 1 1v :", # Can't have a parent of '1v'
794
index = self.get_knit_index(transport, 'filename', 'r')
796
self.assertRaises(errors.KnitCorrupt, index.keys)
798
if (str(e) == ('exceptions must be strings, classes, or instances,'
799
' not exceptions.ValueError')
800
and sys.version_info[0:2] >= (2,5)):
801
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
802
' raising new style exceptions with python'
807
def test_corrupted_parent_in_list(self):
808
transport = MockTransport([
812
"c option 0 1 1 v :", # Can't have a parent of 'v'
814
index = self.get_knit_index(transport, 'filename', 'r')
816
self.assertRaises(errors.KnitCorrupt, index.keys)
818
if (str(e) == ('exceptions must be strings, classes, or instances,'
819
' not exceptions.ValueError')
820
and sys.version_info[0:2] >= (2,5)):
821
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
822
' raising new style exceptions with python'
827
def test_invalid_position(self):
828
transport = MockTransport([
832
index = self.get_knit_index(transport, 'filename', 'r')
834
self.assertRaises(errors.KnitCorrupt, index.keys)
836
if (str(e) == ('exceptions must be strings, classes, or instances,'
837
' not exceptions.ValueError')
838
and sys.version_info[0:2] >= (2,5)):
839
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
840
' raising new style exceptions with python'
845
def test_invalid_size(self):
846
transport = MockTransport([
850
index = self.get_knit_index(transport, 'filename', 'r')
852
self.assertRaises(errors.KnitCorrupt, index.keys)
854
if (str(e) == ('exceptions must be strings, classes, or instances,'
855
' not exceptions.ValueError')
856
and sys.version_info[0:2] >= (2,5)):
857
self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
858
' raising new style exceptions with python'
863
def test_short_line(self):
864
transport = MockTransport([
867
"b option 10 10 0", # This line isn't terminated, ignored
869
index = self.get_knit_index(transport, "filename", "r")
870
self.assertEqual(set([('a',)]), index.keys())
872
def test_skip_incomplete_record(self):
873
# A line with bogus data should just be skipped
874
transport = MockTransport([
877
"b option 10 10 0", # This line isn't terminated, ignored
878
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
880
index = self.get_knit_index(transport, "filename", "r")
881
self.assertEqual(set([('a',), ('c',)]), index.keys())
883
def test_trailing_characters(self):
884
# A line with bogus data should just be skipped
885
transport = MockTransport([
888
"b option 10 10 0 :a", # This line has extra trailing characters
889
"c option 20 10 0 :", # Properly terminated, and starts with '\n'
891
index = self.get_knit_index(transport, "filename", "r")
892
self.assertEqual(set([('a',), ('c',)]), index.keys())
895
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
897
_test_needs_features = [CompiledKnitFeature]
899
def get_knit_index(self, transport, name, mode):
900
mapper = ConstantMapper(name)
901
orig = knit._load_data
903
knit._load_data = orig
904
self.addCleanup(reset)
905
from bzrlib._knit_load_data_c import _load_data_c
906
knit._load_data = _load_data_c
907
allow_writes = lambda: mode == 'w'
908
return _KndxIndex(transport, mapper, lambda:None, allow_writes, lambda:True)
911
class KnitTests(TestCaseWithTransport):
912
"""Class containing knit test helper routines."""
914
def make_test_knit(self, annotate=False, name='test'):
915
mapper = ConstantMapper(name)
916
return make_file_factory(annotate, mapper)(self.get_transport())
919
class TestKnitIndex(KnitTests):
921
def test_add_versions_dictionary_compresses(self):
922
"""Adding versions to the index should update the lookup dict"""
923
knit = self.make_test_knit()
925
idx.add_records([(('a-1',), ['fulltext'], (('a-1',), 0, 0), [])])
926
self.check_file_contents('test.kndx',
927
'# bzr knit index 8\n'
932
(('a-2',), ['fulltext'], (('a-2',), 0, 0), [('a-1',)]),
933
(('a-3',), ['fulltext'], (('a-3',), 0, 0), [('a-2',)]),
935
self.check_file_contents('test.kndx',
936
'# bzr knit index 8\n'
938
'a-1 fulltext 0 0 :\n'
939
'a-2 fulltext 0 0 0 :\n'
940
'a-3 fulltext 0 0 1 :'
942
self.assertEqual(set([('a-3',), ('a-1',), ('a-2',)]), idx.keys())
944
('a-1',): ((('a-1',), 0, 0), None, (), ('fulltext', False)),
945
('a-2',): ((('a-2',), 0, 0), None, (('a-1',),), ('fulltext', False)),
946
('a-3',): ((('a-3',), 0, 0), None, (('a-2',),), ('fulltext', False)),
947
}, idx.get_build_details(idx.keys()))
948
self.assertEqual({('a-1',):(),
949
('a-2',):(('a-1',),),
950
('a-3',):(('a-2',),),},
951
idx.get_parent_map(idx.keys()))
953
def test_add_versions_fails_clean(self):
954
"""If add_versions fails in the middle, it restores a pristine state.
956
Any modifications that are made to the index are reset if all versions
959
# This cheats a little bit by passing in a generator which will
960
# raise an exception before the processing finishes
961
# Other possibilities would be to have an version with the wrong number
962
# of entries, or to make the backing transport unable to write any
965
knit = self.make_test_knit()
967
idx.add_records([(('a-1',), ['fulltext'], (('a-1',), 0, 0), [])])
969
class StopEarly(Exception):
972
def generate_failure():
973
"""Add some entries and then raise an exception"""
974
yield (('a-2',), ['fulltext'], (None, 0, 0), ('a-1',))
975
yield (('a-3',), ['fulltext'], (None, 0, 0), ('a-2',))
978
# Assert the pre-condition
980
self.assertEqual(set([('a-1',)]), set(idx.keys()))
982
{('a-1',): ((('a-1',), 0, 0), None, (), ('fulltext', False))},
983
idx.get_build_details([('a-1',)]))
984
self.assertEqual({('a-1',):()}, idx.get_parent_map(idx.keys()))
987
self.assertRaises(StopEarly, idx.add_records, generate_failure())
988
# And it shouldn't be modified
991
def test_knit_index_ignores_empty_files(self):
992
# There was a race condition in older bzr, where a ^C at the right time
993
# could leave an empty .kndx file, which bzr would later claim was a
994
# corrupted file since the header was not present. In reality, the file
995
# just wasn't created, so it should be ignored.
996
t = get_transport('.')
997
t.put_bytes('test.kndx', '')
999
knit = self.make_test_knit()
1001
def test_knit_index_checks_header(self):
1002
t = get_transport('.')
1003
t.put_bytes('test.kndx', '# not really a knit header\n\n')
1004
k = self.make_test_knit()
1005
self.assertRaises(KnitHeaderError, k.keys)
1008
class TestGraphIndexKnit(KnitTests):
1009
"""Tests for knits using a GraphIndex rather than a KnitIndex."""
1011
def make_g_index(self, name, ref_lists=0, nodes=[]):
1012
builder = GraphIndexBuilder(ref_lists)
1013
for node, references, value in nodes:
1014
builder.add_node(node, references, value)
1015
stream = builder.finish()
1016
trans = self.get_transport()
1017
size = trans.put_file(name, stream)
1018
return GraphIndex(trans, name, size)
1020
def two_graph_index(self, deltas=False, catch_adds=False):
1021
"""Build a two-graph index.
1023
:param deltas: If true, use underlying indices with two node-ref
1024
lists and 'parent' set to a delta-compressed against tail.
1026
# build a complex graph across several indices.
1028
# delta compression inn the index
1029
index1 = self.make_g_index('1', 2, [
1030
(('tip', ), 'N0 100', ([('parent', )], [], )),
1031
(('tail', ), '', ([], []))])
1032
index2 = self.make_g_index('2', 2, [
1033
(('parent', ), ' 100 78', ([('tail', ), ('ghost', )], [('tail', )])),
1034
(('separate', ), '', ([], []))])
1036
# just blob location and graph in the index.
1037
index1 = self.make_g_index('1', 1, [
1038
(('tip', ), 'N0 100', ([('parent', )], )),
1039
(('tail', ), '', ([], ))])
1040
index2 = self.make_g_index('2', 1, [
1041
(('parent', ), ' 100 78', ([('tail', ), ('ghost', )], )),
1042
(('separate', ), '', ([], ))])
1043
combined_index = CombinedGraphIndex([index1, index2])
1045
self.combined_index = combined_index
1046
self.caught_entries = []
1047
add_callback = self.catch_add
1050
return _KnitGraphIndex(combined_index, lambda:True, deltas=deltas,
1051
add_callback=add_callback)
1053
def test_keys(self):
1054
index = self.two_graph_index()
1055
self.assertEqual(set([('tail',), ('tip',), ('parent',), ('separate',)]),
1058
def test_get_position(self):
1059
index = self.two_graph_index()
1060
self.assertEqual((index._graph_index._indices[0], 0, 100), index.get_position(('tip',)))
1061
self.assertEqual((index._graph_index._indices[1], 100, 78), index.get_position(('parent',)))
1063
def test_get_method_deltas(self):
1064
index = self.two_graph_index(deltas=True)
1065
self.assertEqual('fulltext', index.get_method(('tip',)))
1066
self.assertEqual('line-delta', index.get_method(('parent',)))
1068
def test_get_method_no_deltas(self):
1069
# check that the parent-history lookup is ignored with deltas=False.
1070
index = self.two_graph_index(deltas=False)
1071
self.assertEqual('fulltext', index.get_method(('tip',)))
1072
self.assertEqual('fulltext', index.get_method(('parent',)))
1074
def test_get_options_deltas(self):
1075
index = self.two_graph_index(deltas=True)
1076
self.assertEqual(['fulltext', 'no-eol'], index.get_options(('tip',)))
1077
self.assertEqual(['line-delta'], index.get_options(('parent',)))
1079
def test_get_options_no_deltas(self):
1080
# check that the parent-history lookup is ignored with deltas=False.
1081
index = self.two_graph_index(deltas=False)
1082
self.assertEqual(['fulltext', 'no-eol'], index.get_options(('tip',)))
1083
self.assertEqual(['fulltext'], index.get_options(('parent',)))
1085
def test_get_parent_map(self):
1086
index = self.two_graph_index()
1087
self.assertEqual({('parent',):(('tail',), ('ghost',))},
1088
index.get_parent_map([('parent',), ('ghost',)]))
1090
def catch_add(self, entries):
1091
self.caught_entries.append(entries)
1093
def test_add_no_callback_errors(self):
1094
index = self.two_graph_index()
1095
self.assertRaises(errors.ReadOnlyError, index.add_records,
1096
[(('new',), 'fulltext,no-eol', (None, 50, 60), ['separate'])])
1098
def test_add_version_smoke(self):
1099
index = self.two_graph_index(catch_adds=True)
1100
index.add_records([(('new',), 'fulltext,no-eol', (None, 50, 60),
1102
self.assertEqual([[(('new', ), 'N50 60', ((('separate',),),))]],
1103
self.caught_entries)
1105
def test_add_version_delta_not_delta_index(self):
1106
index = self.two_graph_index(catch_adds=True)
1107
self.assertRaises(errors.KnitCorrupt, index.add_records,
1108
[(('new',), 'no-eol,line-delta', (None, 0, 100), [('parent',)])])
1109
self.assertEqual([], self.caught_entries)
1111
def test_add_version_same_dup(self):
1112
index = self.two_graph_index(catch_adds=True)
1113
# options can be spelt two different ways
1114
index.add_records([(('tip',), 'fulltext,no-eol', (None, 0, 100), [('parent',)])])
1115
index.add_records([(('tip',), 'no-eol,fulltext', (None, 0, 100), [('parent',)])])
1116
# position/length are ignored (because each pack could have fulltext or
1117
# delta, and be at a different position.
1118
index.add_records([(('tip',), 'fulltext,no-eol', (None, 50, 100),
1120
index.add_records([(('tip',), 'fulltext,no-eol', (None, 0, 1000),
1122
# but neither should have added data:
1123
self.assertEqual([[], [], [], []], self.caught_entries)
1125
def test_add_version_different_dup(self):
1126
index = self.two_graph_index(deltas=True, catch_adds=True)
1128
self.assertRaises(errors.KnitCorrupt, index.add_records,
1129
[(('tip',), 'no-eol,line-delta', (None, 0, 100), [('parent',)])])
1130
self.assertRaises(errors.KnitCorrupt, index.add_records,
1131
[(('tip',), 'line-delta,no-eol', (None, 0, 100), [('parent',)])])
1132
self.assertRaises(errors.KnitCorrupt, index.add_records,
1133
[(('tip',), 'fulltext', (None, 0, 100), [('parent',)])])
1135
self.assertRaises(errors.KnitCorrupt, index.add_records,
1136
[(('tip',), 'fulltext,no-eol', (None, 0, 100), [])])
1137
self.assertEqual([], self.caught_entries)
1139
def test_add_versions_nodeltas(self):
1140
index = self.two_graph_index(catch_adds=True)
1142
(('new',), 'fulltext,no-eol', (None, 50, 60), [('separate',)]),
1143
(('new2',), 'fulltext', (None, 0, 6), [('new',)]),
1145
self.assertEqual([(('new', ), 'N50 60', ((('separate',),),)),
1146
(('new2', ), ' 0 6', ((('new',),),))],
1147
sorted(self.caught_entries[0]))
1148
self.assertEqual(1, len(self.caught_entries))
1150
def test_add_versions_deltas(self):
1151
index = self.two_graph_index(deltas=True, catch_adds=True)
1153
(('new',), 'fulltext,no-eol', (None, 50, 60), [('separate',)]),
1154
(('new2',), 'line-delta', (None, 0, 6), [('new',)]),
1156
self.assertEqual([(('new', ), 'N50 60', ((('separate',),), ())),
1157
(('new2', ), ' 0 6', ((('new',),), (('new',),), ))],
1158
sorted(self.caught_entries[0]))
1159
self.assertEqual(1, len(self.caught_entries))
1161
def test_add_versions_delta_not_delta_index(self):
1162
index = self.two_graph_index(catch_adds=True)
1163
self.assertRaises(errors.KnitCorrupt, index.add_records,
1164
[(('new',), 'no-eol,line-delta', (None, 0, 100), [('parent',)])])
1165
self.assertEqual([], self.caught_entries)
1167
def test_add_versions_random_id_accepted(self):
1168
index = self.two_graph_index(catch_adds=True)
1169
index.add_records([], random_id=True)
1171
def test_add_versions_same_dup(self):
1172
index = self.two_graph_index(catch_adds=True)
1173
# options can be spelt two different ways
1174
index.add_records([(('tip',), 'fulltext,no-eol', (None, 0, 100),
1176
index.add_records([(('tip',), 'no-eol,fulltext', (None, 0, 100),
1178
# position/length are ignored (because each pack could have fulltext or
1179
# delta, and be at a different position.
1180
index.add_records([(('tip',), 'fulltext,no-eol', (None, 50, 100),
1182
index.add_records([(('tip',), 'fulltext,no-eol', (None, 0, 1000),
1184
# but neither should have added data.
1185
self.assertEqual([[], [], [], []], self.caught_entries)
1187
def test_add_versions_different_dup(self):
1188
index = self.two_graph_index(deltas=True, catch_adds=True)
1190
self.assertRaises(errors.KnitCorrupt, index.add_records,
1191
[(('tip',), 'no-eol,line-delta', (None, 0, 100), [('parent',)])])
1192
self.assertRaises(errors.KnitCorrupt, index.add_records,
1193
[(('tip',), 'line-delta,no-eol', (None, 0, 100), [('parent',)])])
1194
self.assertRaises(errors.KnitCorrupt, index.add_records,
1195
[(('tip',), 'fulltext', (None, 0, 100), [('parent',)])])
1197
self.assertRaises(errors.KnitCorrupt, index.add_records,
1198
[(('tip',), 'fulltext,no-eol', (None, 0, 100), [])])
1199
# change options in the second record
1200
self.assertRaises(errors.KnitCorrupt, index.add_records,
1201
[(('tip',), 'fulltext,no-eol', (None, 0, 100), [('parent',)]),
1202
(('tip',), 'no-eol,line-delta', (None, 0, 100), [('parent',)])])
1203
self.assertEqual([], self.caught_entries)
1206
class TestNoParentsGraphIndexKnit(KnitTests):
1207
"""Tests for knits using _KnitGraphIndex with no parents."""
1209
def make_g_index(self, name, ref_lists=0, nodes=[]):
1210
builder = GraphIndexBuilder(ref_lists)
1211
for node, references in nodes:
1212
builder.add_node(node, references)
1213
stream = builder.finish()
1214
trans = self.get_transport()
1215
size = trans.put_file(name, stream)
1216
return GraphIndex(trans, name, size)
1218
def test_parents_deltas_incompatible(self):
1219
index = CombinedGraphIndex([])
1220
self.assertRaises(errors.KnitError, _KnitGraphIndex, lambda:True,
1221
index, deltas=True, parents=False)
1223
def two_graph_index(self, catch_adds=False):
1224
"""Build a two-graph index.
1226
:param deltas: If true, use underlying indices with two node-ref
1227
lists and 'parent' set to a delta-compressed against tail.
1229
# put several versions in the index.
1230
index1 = self.make_g_index('1', 0, [
1231
(('tip', ), 'N0 100'),
1233
index2 = self.make_g_index('2', 0, [
1234
(('parent', ), ' 100 78'),
1235
(('separate', ), '')])
1236
combined_index = CombinedGraphIndex([index1, index2])
1238
self.combined_index = combined_index
1239
self.caught_entries = []
1240
add_callback = self.catch_add
1243
return _KnitGraphIndex(combined_index, lambda:True, parents=False,
1244
add_callback=add_callback)
1246
def test_keys(self):
1247
index = self.two_graph_index()
1248
self.assertEqual(set([('tail',), ('tip',), ('parent',), ('separate',)]),
1251
def test_get_position(self):
1252
index = self.two_graph_index()
1253
self.assertEqual((index._graph_index._indices[0], 0, 100),
1254
index.get_position(('tip',)))
1255
self.assertEqual((index._graph_index._indices[1], 100, 78),
1256
index.get_position(('parent',)))
1258
def test_get_method(self):
1259
index = self.two_graph_index()
1260
self.assertEqual('fulltext', index.get_method(('tip',)))
1261
self.assertEqual(['fulltext'], index.get_options(('parent',)))
1263
def test_get_options(self):
1264
index = self.two_graph_index()
1265
self.assertEqual(['fulltext', 'no-eol'], index.get_options(('tip',)))
1266
self.assertEqual(['fulltext'], index.get_options(('parent',)))
1268
def test_get_parent_map(self):
1269
index = self.two_graph_index()
1270
self.assertEqual({('parent',):None},
1271
index.get_parent_map([('parent',), ('ghost',)]))
1273
def catch_add(self, entries):
1274
self.caught_entries.append(entries)
1276
def test_add_no_callback_errors(self):
1277
index = self.two_graph_index()
1278
self.assertRaises(errors.ReadOnlyError, index.add_records,
1279
[(('new',), 'fulltext,no-eol', (None, 50, 60), [('separate',)])])
1281
def test_add_version_smoke(self):
1282
index = self.two_graph_index(catch_adds=True)
1283
index.add_records([(('new',), 'fulltext,no-eol', (None, 50, 60), [])])
1284
self.assertEqual([[(('new', ), 'N50 60')]],
1285
self.caught_entries)
1287
def test_add_version_delta_not_delta_index(self):
1288
index = self.two_graph_index(catch_adds=True)
1289
self.assertRaises(errors.KnitCorrupt, index.add_records,
1290
[(('new',), 'no-eol,line-delta', (None, 0, 100), [])])
1291
self.assertEqual([], self.caught_entries)
1293
def test_add_version_same_dup(self):
1294
index = self.two_graph_index(catch_adds=True)
1295
# options can be spelt two different ways
1296
index.add_records([(('tip',), 'fulltext,no-eol', (None, 0, 100), [])])
1297
index.add_records([(('tip',), 'no-eol,fulltext', (None, 0, 100), [])])
1298
# position/length are ignored (because each pack could have fulltext or
1299
# delta, and be at a different position.
1300
index.add_records([(('tip',), 'fulltext,no-eol', (None, 50, 100), [])])
1301
index.add_records([(('tip',), 'fulltext,no-eol', (None, 0, 1000), [])])
1302
# but neither should have added data.
1303
self.assertEqual([[], [], [], []], self.caught_entries)
1305
def test_add_version_different_dup(self):
1306
index = self.two_graph_index(catch_adds=True)
1308
self.assertRaises(errors.KnitCorrupt, index.add_records,
1309
[(('tip',), 'no-eol,line-delta', (None, 0, 100), [])])
1310
self.assertRaises(errors.KnitCorrupt, index.add_records,
1311
[(('tip',), 'line-delta,no-eol', (None, 0, 100), [])])
1312
self.assertRaises(errors.KnitCorrupt, index.add_records,
1313
[(('tip',), 'fulltext', (None, 0, 100), [])])
1315
self.assertRaises(errors.KnitCorrupt, index.add_records,
1316
[(('tip',), 'fulltext,no-eol', (None, 0, 100), [('parent',)])])
1317
self.assertEqual([], self.caught_entries)
1319
def test_add_versions(self):
1320
index = self.two_graph_index(catch_adds=True)
1322
(('new',), 'fulltext,no-eol', (None, 50, 60), []),
1323
(('new2',), 'fulltext', (None, 0, 6), []),
1325
self.assertEqual([(('new', ), 'N50 60'), (('new2', ), ' 0 6')],
1326
sorted(self.caught_entries[0]))
1327
self.assertEqual(1, len(self.caught_entries))
1329
def test_add_versions_delta_not_delta_index(self):
1330
index = self.two_graph_index(catch_adds=True)
1331
self.assertRaises(errors.KnitCorrupt, index.add_records,
1332
[(('new',), 'no-eol,line-delta', (None, 0, 100), [('parent',)])])
1333
self.assertEqual([], self.caught_entries)
1335
def test_add_versions_parents_not_parents_index(self):
1336
index = self.two_graph_index(catch_adds=True)
1337
self.assertRaises(errors.KnitCorrupt, index.add_records,
1338
[(('new',), 'no-eol,fulltext', (None, 0, 100), [('parent',)])])
1339
self.assertEqual([], self.caught_entries)
1341
def test_add_versions_random_id_accepted(self):
1342
index = self.two_graph_index(catch_adds=True)
1343
index.add_records([], random_id=True)
1345
def test_add_versions_same_dup(self):
1346
index = self.two_graph_index(catch_adds=True)
1347
# options can be spelt two different ways
1348
index.add_records([(('tip',), 'fulltext,no-eol', (None, 0, 100), [])])
1349
index.add_records([(('tip',), 'no-eol,fulltext', (None, 0, 100), [])])
1350
# position/length are ignored (because each pack could have fulltext or
1351
# delta, and be at a different position.
1352
index.add_records([(('tip',), 'fulltext,no-eol', (None, 50, 100), [])])
1353
index.add_records([(('tip',), 'fulltext,no-eol', (None, 0, 1000), [])])
1354
# but neither should have added data.
1355
self.assertEqual([[], [], [], []], self.caught_entries)
1357
def test_add_versions_different_dup(self):
1358
index = self.two_graph_index(catch_adds=True)
1360
self.assertRaises(errors.KnitCorrupt, index.add_records,
1361
[(('tip',), 'no-eol,line-delta', (None, 0, 100), [])])
1362
self.assertRaises(errors.KnitCorrupt, index.add_records,
1363
[(('tip',), 'line-delta,no-eol', (None, 0, 100), [])])
1364
self.assertRaises(errors.KnitCorrupt, index.add_records,
1365
[(('tip',), 'fulltext', (None, 0, 100), [])])
1367
self.assertRaises(errors.KnitCorrupt, index.add_records,
1368
[(('tip',), 'fulltext,no-eol', (None, 0, 100), [('parent',)])])
1369
# change options in the second record
1370
self.assertRaises(errors.KnitCorrupt, index.add_records,
1371
[(('tip',), 'fulltext,no-eol', (None, 0, 100), []),
1372
(('tip',), 'no-eol,line-delta', (None, 0, 100), [])])
1373
self.assertEqual([], self.caught_entries)