1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
23
from bzrlib import errors
24
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
25
from bzrlib.knit import (
31
from bzrlib.osutils import split_lines
32
from bzrlib.tests import TestCase, TestCaseWithTransport
33
from bzrlib.transport import TransportLogger, get_transport
34
from bzrlib.transport.memory import MemoryTransport
35
from bzrlib.weave import Weave
38
class KnitContentTests(TestCase):
40
def test_constructor(self):
41
content = KnitContent([])
44
content = KnitContent([])
45
self.assertEqual(content.text(), [])
47
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
48
self.assertEqual(content.text(), ["text1", "text2"])
50
def test_annotate(self):
51
content = KnitContent([])
52
self.assertEqual(content.annotate(), [])
54
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
55
self.assertEqual(content.annotate(),
56
[("origin1", "text1"), ("origin2", "text2")])
58
def test_annotate_iter(self):
59
content = KnitContent([])
60
it = content.annotate_iter()
61
self.assertRaises(StopIteration, it.next)
63
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
64
it = content.annotate_iter()
65
self.assertEqual(it.next(), ("origin1", "text1"))
66
self.assertEqual(it.next(), ("origin2", "text2"))
67
self.assertRaises(StopIteration, it.next)
70
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
72
self.assertIsInstance(copy, KnitContent)
73
self.assertEqual(copy.annotate(),
74
[("origin1", "text1"), ("origin2", "text2")])
76
def test_line_delta(self):
77
content1 = KnitContent([("", "a"), ("", "b")])
78
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
79
self.assertEqual(content1.line_delta(content2),
80
[(1, 2, 2, [("", "a"), ("", "c")])])
82
def test_line_delta_iter(self):
83
content1 = KnitContent([("", "a"), ("", "b")])
84
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
85
it = content1.line_delta_iter(content2)
86
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
87
self.assertRaises(StopIteration, it.next)
90
class KnitTests(TestCaseWithTransport):
91
"""Class containing knit test helper routines."""
93
def make_test_knit(self, annotate=False, delay_create=False):
95
factory = KnitPlainFactory()
98
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
99
factory=factory, create=True,
100
delay_create=delay_create)
103
class BasicKnitTests(KnitTests):
105
def add_stock_one_and_one_a(self, k):
106
k.add_lines('text-1', [], split_lines(TEXT_1))
107
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
109
def test_knit_constructor(self):
110
"""Construct empty k"""
111
self.make_test_knit()
113
def test_knit_add(self):
114
"""Store one text in knit and retrieve"""
115
k = self.make_test_knit()
116
k.add_lines('text-1', [], split_lines(TEXT_1))
117
self.assertTrue(k.has_version('text-1'))
118
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
120
def test_knit_reload(self):
121
# test that the content in a reloaded knit is correct
122
k = self.make_test_knit()
123
k.add_lines('text-1', [], split_lines(TEXT_1))
125
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
126
self.assertTrue(k2.has_version('text-1'))
127
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
129
def test_knit_several(self):
130
"""Store several texts in a knit"""
131
k = self.make_test_knit()
132
k.add_lines('text-1', [], split_lines(TEXT_1))
133
k.add_lines('text-2', [], split_lines(TEXT_2))
134
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
135
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
137
def test_repeated_add(self):
138
"""Knit traps attempt to replace existing version"""
139
k = self.make_test_knit()
140
k.add_lines('text-1', [], split_lines(TEXT_1))
141
self.assertRaises(RevisionAlreadyPresent,
143
'text-1', [], split_lines(TEXT_1))
145
def test_empty(self):
146
k = self.make_test_knit(True)
147
k.add_lines('text-1', [], [])
148
self.assertEquals(k.get_lines('text-1'), [])
150
def test_incomplete(self):
151
"""Test if texts without a ending line-end can be inserted and
153
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
154
k.add_lines('text-1', [], ['a\n', 'b' ])
155
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
156
# reopening ensures maximum room for confusion
157
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
158
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
159
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
161
def test_delta(self):
162
"""Expression of knit delta as lines"""
163
k = self.make_test_knit()
164
td = list(line_delta(TEXT_1.splitlines(True),
165
TEXT_1A.splitlines(True)))
166
self.assertEqualDiff(''.join(td), delta_1_1a)
167
out = apply_line_delta(TEXT_1.splitlines(True), td)
168
self.assertEqualDiff(''.join(out), TEXT_1A)
170
def test_add_with_parents(self):
171
"""Store in knit with parents"""
172
k = self.make_test_knit()
173
self.add_stock_one_and_one_a(k)
174
self.assertEquals(k.get_parents('text-1'), [])
175
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
177
def test_ancestry(self):
178
"""Store in knit with parents"""
179
k = self.make_test_knit()
180
self.add_stock_one_and_one_a(k)
181
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
183
def test_add_delta(self):
184
"""Store in knit with parents"""
185
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
186
delta=True, create=True)
187
self.add_stock_one_and_one_a(k)
189
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
191
def test_annotate(self):
193
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
194
delta=True, create=True)
195
self.insert_and_test_small_annotate(k)
197
def insert_and_test_small_annotate(self, k):
198
"""test annotation with k works correctly."""
199
k.add_lines('text-1', [], ['a\n', 'b\n'])
200
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
202
origins = k.annotate('text-2')
203
self.assertEquals(origins[0], ('text-1', 'a\n'))
204
self.assertEquals(origins[1], ('text-2', 'c\n'))
206
def test_annotate_fulltext(self):
208
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
209
delta=False, create=True)
210
self.insert_and_test_small_annotate(k)
212
def test_annotate_merge_1(self):
213
k = self.make_test_knit(True)
214
k.add_lines('text-a1', [], ['a\n', 'b\n'])
215
k.add_lines('text-a2', [], ['d\n', 'c\n'])
216
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
217
origins = k.annotate('text-am')
218
self.assertEquals(origins[0], ('text-a2', 'd\n'))
219
self.assertEquals(origins[1], ('text-a1', 'b\n'))
221
def test_annotate_merge_2(self):
222
k = self.make_test_knit(True)
223
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
224
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
225
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
226
origins = k.annotate('text-am')
227
self.assertEquals(origins[0], ('text-a1', 'a\n'))
228
self.assertEquals(origins[1], ('text-a2', 'y\n'))
229
self.assertEquals(origins[2], ('text-a1', 'c\n'))
231
def test_annotate_merge_9(self):
232
k = self.make_test_knit(True)
233
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
234
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
235
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
236
origins = k.annotate('text-am')
237
self.assertEquals(origins[0], ('text-am', 'k\n'))
238
self.assertEquals(origins[1], ('text-a2', 'y\n'))
239
self.assertEquals(origins[2], ('text-a1', 'c\n'))
241
def test_annotate_merge_3(self):
242
k = self.make_test_knit(True)
243
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
244
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
245
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
246
origins = k.annotate('text-am')
247
self.assertEquals(origins[0], ('text-am', 'k\n'))
248
self.assertEquals(origins[1], ('text-a2', 'y\n'))
249
self.assertEquals(origins[2], ('text-a2', 'z\n'))
251
def test_annotate_merge_4(self):
252
k = self.make_test_knit(True)
253
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
254
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
255
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
256
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
257
origins = k.annotate('text-am')
258
self.assertEquals(origins[0], ('text-a1', 'a\n'))
259
self.assertEquals(origins[1], ('text-a1', 'b\n'))
260
self.assertEquals(origins[2], ('text-a2', 'z\n'))
262
def test_annotate_merge_5(self):
263
k = self.make_test_knit(True)
264
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
265
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
266
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
267
k.add_lines('text-am',
268
['text-a1', 'text-a2', 'text-a3'],
269
['a\n', 'e\n', 'z\n'])
270
origins = k.annotate('text-am')
271
self.assertEquals(origins[0], ('text-a1', 'a\n'))
272
self.assertEquals(origins[1], ('text-a2', 'e\n'))
273
self.assertEquals(origins[2], ('text-a3', 'z\n'))
275
def test_annotate_file_cherry_pick(self):
276
k = self.make_test_knit(True)
277
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
278
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
279
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
280
origins = k.annotate('text-3')
281
self.assertEquals(origins[0], ('text-1', 'a\n'))
282
self.assertEquals(origins[1], ('text-1', 'b\n'))
283
self.assertEquals(origins[2], ('text-1', 'c\n'))
285
def test_knit_join(self):
286
"""Store in knit with parents"""
287
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
288
k1.add_lines('text-a', [], split_lines(TEXT_1))
289
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
291
k1.add_lines('text-c', [], split_lines(TEXT_1))
292
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
294
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
296
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
297
count = k2.join(k1, version_ids=['text-m'])
298
self.assertEquals(count, 5)
299
self.assertTrue(k2.has_version('text-a'))
300
self.assertTrue(k2.has_version('text-c'))
302
def test_reannotate(self):
303
k1 = KnitVersionedFile('knit1', get_transport('.'),
304
factory=KnitAnnotateFactory(), create=True)
306
k1.add_lines('text-a', [], ['a\n', 'b\n'])
308
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
310
k2 = KnitVersionedFile('test2', get_transport('.'),
311
factory=KnitAnnotateFactory(), create=True)
312
k2.join(k1, version_ids=['text-b'])
315
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
317
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
319
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
321
# test-c will have index 3
322
k1.join(k2, version_ids=['text-c'])
324
lines = k1.get_lines('text-c')
325
self.assertEquals(lines, ['z\n', 'c\n'])
327
origins = k1.annotate('text-c')
328
self.assertEquals(origins[0], ('text-c', 'z\n'))
329
self.assertEquals(origins[1], ('text-b', 'c\n'))
331
def test_get_line_delta_texts(self):
332
"""Make sure we can call get_texts on text with reused line deltas"""
333
k1 = KnitVersionedFile('test1', get_transport('.'),
334
factory=KnitPlainFactory(), create=True)
339
parents = ['%d' % (t-1)]
340
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
341
k1.get_texts(('%d' % t) for t in range(3))
343
def test_iter_lines_reads_in_order(self):
344
t = MemoryTransport()
345
instrumented_t = TransportLogger(t)
346
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
347
self.assertEqual([('id.kndx',)], instrumented_t._calls)
348
# add texts with no required ordering
349
k1.add_lines('base', [], ['text\n'])
350
k1.add_lines('base2', [], ['text2\n'])
352
instrumented_t._calls = []
353
# request a last-first iteration
354
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
355
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
356
self.assertEqual(['text\n', 'text2\n'], results)
358
def test_create_empty_annotated(self):
359
k1 = self.make_test_knit(True)
361
k1.add_lines('text-a', [], ['a\n', 'b\n'])
362
k2 = k1.create_empty('t', MemoryTransport())
363
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
364
self.assertEqual(k1.delta, k2.delta)
365
# the generic test checks for empty content and file class
367
def test_knit_format(self):
368
# this tests that a new knit index file has the expected content
369
# and that is writes the data we expect as records are added.
370
knit = self.make_test_knit(True)
371
# Now knit files are not created until we first add data to them
372
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
373
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
374
self.assertFileEqual(
375
"# bzr knit index 8\n"
377
"revid fulltext 0 84 .a_ghost :",
379
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
380
self.assertFileEqual(
381
"# bzr knit index 8\n"
382
"\nrevid fulltext 0 84 .a_ghost :"
383
"\nrevid2 line-delta 84 82 0 :",
385
# we should be able to load this file again
386
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
387
self.assertEqual(['revid', 'revid2'], knit.versions())
388
# write a short write to the file and ensure that its ignored
389
indexfile = file('test.kndx', 'at')
390
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
392
# we should be able to load this file again
393
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
394
self.assertEqual(['revid', 'revid2'], knit.versions())
395
# and add a revision with the same id the failed write had
396
knit.add_lines('revid3', ['revid2'], ['a\n'])
397
# and when reading it revid3 should now appear.
398
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
399
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
400
self.assertEqual(['revid2'], knit.get_parents('revid3'))
402
def test_delay_create(self):
403
"""Test that passing delay_create=True creates files late"""
404
knit = self.make_test_knit(annotate=True, delay_create=True)
405
self.failIfExists('test.knit')
406
self.failIfExists('test.kndx')
407
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
408
self.failUnlessExists('test.knit')
409
self.assertFileEqual(
410
"# bzr knit index 8\n"
412
"revid fulltext 0 84 .a_ghost :",
415
def test_create_parent_dir(self):
416
"""create_parent_dir can create knits in nonexistant dirs"""
417
# Has no effect if we don't set 'delay_create'
418
trans = get_transport('.')
419
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
420
trans, access_mode='w', factory=None,
421
create=True, create_parent_dir=True)
422
# Nothing should have changed yet
423
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
424
factory=None, create=True,
425
create_parent_dir=True,
427
self.failIfExists('dir/test.knit')
428
self.failIfExists('dir/test.kndx')
429
self.failIfExists('dir')
430
knit.add_lines('revid', [], ['a\n'])
431
self.failUnlessExists('dir')
432
self.failUnlessExists('dir/test.knit')
433
self.assertFileEqual(
434
"# bzr knit index 8\n"
436
"revid fulltext 0 84 :",
439
def test_create_mode_700(self):
440
trans = get_transport('.')
441
if not trans._can_roundtrip_unix_modebits():
442
# Can't roundtrip, so no need to run this test
444
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
445
factory=None, create=True,
446
create_parent_dir=True,
450
knit.add_lines('revid', [], ['a\n'])
451
self.assertTransportMode(trans, 'dir', 0700)
452
self.assertTransportMode(trans, 'dir/test.knit', 0600)
453
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
455
def test_create_mode_770(self):
456
trans = get_transport('.')
457
if not trans._can_roundtrip_unix_modebits():
458
# Can't roundtrip, so no need to run this test
460
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
461
factory=None, create=True,
462
create_parent_dir=True,
466
knit.add_lines('revid', [], ['a\n'])
467
self.assertTransportMode(trans, 'dir', 0770)
468
self.assertTransportMode(trans, 'dir/test.knit', 0660)
469
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
471
def test_create_mode_777(self):
472
trans = get_transport('.')
473
if not trans._can_roundtrip_unix_modebits():
474
# Can't roundtrip, so no need to run this test
476
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
477
factory=None, create=True,
478
create_parent_dir=True,
482
knit.add_lines('revid', [], ['a\n'])
483
self.assertTransportMode(trans, 'dir', 0777)
484
self.assertTransportMode(trans, 'dir/test.knit', 0666)
485
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
487
def test_plan_merge(self):
488
my_knit = self.make_test_knit(annotate=True)
489
my_knit.add_lines('text1', [], split_lines(TEXT_1))
490
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
491
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
492
plan = list(my_knit.plan_merge('text1a', 'text1b'))
493
for plan_line, expected_line in zip(plan, AB_MERGE):
494
self.assertEqual(plan_line, expected_line)
506
Banana cup cake recipe
516
Banana cup cake recipe
518
- bananas (do not use plantains!!!)
525
Banana cup cake recipe
541
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
546
new-b|- bananas (do not use plantains!!!)
547
unchanged|- broken tea cups
548
new-a|- self-raising flour
551
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
554
def line_delta(from_lines, to_lines):
555
"""Generate line-based delta from one text to another"""
556
s = difflib.SequenceMatcher(None, from_lines, to_lines)
557
for op in s.get_opcodes():
560
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
561
for i in range(op[3], op[4]):
565
def apply_line_delta(basis_lines, delta_lines):
566
"""Apply a line-based perfect diff
568
basis_lines -- text to apply the patch to
569
delta_lines -- diff instructions and content
574
while i < len(delta_lines):
576
a, b, c = map(long, l.split(','))
578
out[offset+a:offset+b] = delta_lines[i:i+c]
580
offset = offset + (b - a) + c
584
class TestWeaveToKnit(KnitTests):
586
def test_weave_to_knit_matches(self):
587
# check that the WeaveToKnit is_compatible function
588
# registers True for a Weave to a Knit.
590
k = self.make_test_knit()
591
self.failUnless(WeaveToKnit.is_compatible(w, k))
592
self.failIf(WeaveToKnit.is_compatible(k, w))
593
self.failIf(WeaveToKnit.is_compatible(w, w))
594
self.failIf(WeaveToKnit.is_compatible(k, k))
597
class TestKnitCaching(KnitTests):
599
def create_knit(self, cache_add=False):
600
k = self.make_test_knit(True)
604
k.add_lines('text-1', [], split_lines(TEXT_1))
605
k.add_lines('text-2', [], split_lines(TEXT_2))
608
def test_no_caching(self):
609
k = self.create_knit()
610
# Nothing should be cached without setting 'enable_cache'
611
self.assertEqual({}, k._data._cache)
613
def test_cache_add_and_clear(self):
614
k = self.create_knit(True)
616
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
619
self.assertEqual({}, k._data._cache)
621
def test_cache_data_read_raw(self):
622
k = self.create_knit()
627
def read_one_raw(version):
628
pos_map = k._get_components_positions([version])
629
method, pos, size, next = pos_map[version]
630
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
631
self.assertEqual(1, len(lst))
634
val = read_one_raw('text-1')
635
self.assertEqual({'text-1':val[1]}, k._data._cache)
638
# After clear, new reads are not cached
639
self.assertEqual({}, k._data._cache)
641
val2 = read_one_raw('text-1')
642
self.assertEqual(val, val2)
643
self.assertEqual({}, k._data._cache)
645
def test_cache_data_read(self):
646
k = self.create_knit()
648
def read_one(version):
649
pos_map = k._get_components_positions([version])
650
method, pos, size, next = pos_map[version]
651
lst = list(k._data.read_records_iter([(version, pos, size)]))
652
self.assertEqual(1, len(lst))
658
val = read_one('text-2')
659
self.assertEqual(['text-2'], k._data._cache.keys())
660
self.assertEqual('text-2', val[0])
661
content, digest = k._data._parse_record('text-2',
662
k._data._cache['text-2'])
663
self.assertEqual(content, val[1])
664
self.assertEqual(digest, val[2])
667
self.assertEqual({}, k._data._cache)
669
val2 = read_one('text-2')
670
self.assertEqual(val, val2)
671
self.assertEqual({}, k._data._cache)
673
def test_cache_read(self):
674
k = self.create_knit()
677
text = k.get_text('text-1')
678
self.assertEqual(TEXT_1, text)
679
self.assertEqual(['text-1'], k._data._cache.keys())
682
self.assertEqual({}, k._data._cache)
684
text = k.get_text('text-1')
685
self.assertEqual(TEXT_1, text)
686
self.assertEqual({}, k._data._cache)
689
class TestKnitIndex(KnitTests):
691
def test_add_versions_dictionary_compresses(self):
692
"""Adding versions to the index should update the lookup dict"""
693
knit = self.make_test_knit()
695
idx.add_version('a-1', ['fulltext'], 0, 0, [])
696
self.check_file_contents('test.kndx',
697
'# bzr knit index 8\n'
701
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
702
('a-3', ['fulltext'], 0, 0, ['a-2']),
704
self.check_file_contents('test.kndx',
705
'# bzr knit index 8\n'
707
'a-1 fulltext 0 0 :\n'
708
'a-2 fulltext 0 0 0 :\n'
709
'a-3 fulltext 0 0 1 :'
711
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
712
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
713
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
714
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
717
def test_add_versions_fails_clean(self):
718
"""If add_versions fails in the middle, it restores a pristine state.
720
Any modifications that are made to the index are reset if all versions
723
# This cheats a little bit by passing in a generator which will
724
# raise an exception before the processing finishes
725
# Other possibilities would be to have an version with the wrong number
726
# of entries, or to make the backing transport unable to write any
729
knit = self.make_test_knit()
731
idx.add_version('a-1', ['fulltext'], 0, 0, [])
733
class StopEarly(Exception):
736
def generate_failure():
737
"""Add some entries and then raise an exception"""
738
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
739
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
742
# Assert the pre-condition
743
self.assertEqual(['a-1'], idx._history)
744
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
746
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
748
# And it shouldn't be modified
749
self.assertEqual(['a-1'], idx._history)
750
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
752
def test_knit_index_ignores_empty_files(self):
753
# There was a race condition in older bzr, where a ^C at the right time
754
# could leave an empty .kndx file, which bzr would later claim was a
755
# corrupted file since the header was not present. In reality, the file
756
# just wasn't created, so it should be ignored.
757
t = get_transport('.')
758
t.put_bytes('test.kndx', '')
760
knit = self.make_test_knit()
762
def test_knit_index_checks_header(self):
763
t = get_transport('.')
764
t.put_bytes('test.kndx', '# not really a knit header\n\n')
766
self.assertRaises(errors.KnitHeaderError, self.make_test_knit)