1
# Copyright (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Knit data structure"""
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
24
from bzrlib.knit import (
30
from bzrlib.osutils import split_lines
31
from bzrlib.tests import TestCase, TestCaseWithTransport
32
from bzrlib.transport import TransportLogger, get_transport
33
from bzrlib.transport.memory import MemoryTransport
34
from bzrlib.weave import Weave
37
class KnitContentTests(TestCase):
39
def test_constructor(self):
40
content = KnitContent([])
43
content = KnitContent([])
44
self.assertEqual(content.text(), [])
46
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
47
self.assertEqual(content.text(), ["text1", "text2"])
49
def test_annotate(self):
50
content = KnitContent([])
51
self.assertEqual(content.annotate(), [])
53
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
54
self.assertEqual(content.annotate(),
55
[("origin1", "text1"), ("origin2", "text2")])
57
def test_annotate_iter(self):
58
content = KnitContent([])
59
it = content.annotate_iter()
60
self.assertRaises(StopIteration, it.next)
62
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
63
it = content.annotate_iter()
64
self.assertEqual(it.next(), ("origin1", "text1"))
65
self.assertEqual(it.next(), ("origin2", "text2"))
66
self.assertRaises(StopIteration, it.next)
69
content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
71
self.assertIsInstance(copy, KnitContent)
72
self.assertEqual(copy.annotate(),
73
[("origin1", "text1"), ("origin2", "text2")])
75
def test_line_delta(self):
76
content1 = KnitContent([("", "a"), ("", "b")])
77
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
78
self.assertEqual(content1.line_delta(content2),
79
[(1, 2, 2, [("", "a"), ("", "c")])])
81
def test_line_delta_iter(self):
82
content1 = KnitContent([("", "a"), ("", "b")])
83
content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
84
it = content1.line_delta_iter(content2)
85
self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
86
self.assertRaises(StopIteration, it.next)
88
class KnitTests(TestCaseWithTransport):
89
"""Class containing knit test helper routines."""
91
def make_test_knit(self, annotate=False, delay_create=False):
93
factory = KnitPlainFactory()
96
return KnitVersionedFile('test', get_transport('.'), access_mode='w',
97
factory=factory, create=True,
98
delay_create=delay_create)
101
class BasicKnitTests(KnitTests):
103
def add_stock_one_and_one_a(self, k):
104
k.add_lines('text-1', [], split_lines(TEXT_1))
105
k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
107
def test_knit_constructor(self):
108
"""Construct empty k"""
109
self.make_test_knit()
111
def test_knit_add(self):
112
"""Store one text in knit and retrieve"""
113
k = self.make_test_knit()
114
k.add_lines('text-1', [], split_lines(TEXT_1))
115
self.assertTrue(k.has_version('text-1'))
116
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
118
def test_knit_reload(self):
119
# test that the content in a reloaded knit is correct
120
k = self.make_test_knit()
121
k.add_lines('text-1', [], split_lines(TEXT_1))
123
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
124
self.assertTrue(k2.has_version('text-1'))
125
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
127
def test_knit_several(self):
128
"""Store several texts in a knit"""
129
k = self.make_test_knit()
130
k.add_lines('text-1', [], split_lines(TEXT_1))
131
k.add_lines('text-2', [], split_lines(TEXT_2))
132
self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
133
self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
135
def test_repeated_add(self):
136
"""Knit traps attempt to replace existing version"""
137
k = self.make_test_knit()
138
k.add_lines('text-1', [], split_lines(TEXT_1))
139
self.assertRaises(RevisionAlreadyPresent,
141
'text-1', [], split_lines(TEXT_1))
143
def test_empty(self):
144
k = self.make_test_knit(True)
145
k.add_lines('text-1', [], [])
146
self.assertEquals(k.get_lines('text-1'), [])
148
def test_incomplete(self):
149
"""Test if texts without a ending line-end can be inserted and
151
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
152
k.add_lines('text-1', [], ['a\n', 'b' ])
153
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
154
# reopening ensures maximum room for confusion
155
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
156
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
157
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
159
def test_delta(self):
160
"""Expression of knit delta as lines"""
161
k = self.make_test_knit()
162
td = list(line_delta(TEXT_1.splitlines(True),
163
TEXT_1A.splitlines(True)))
164
self.assertEqualDiff(''.join(td), delta_1_1a)
165
out = apply_line_delta(TEXT_1.splitlines(True), td)
166
self.assertEqualDiff(''.join(out), TEXT_1A)
168
def test_add_with_parents(self):
169
"""Store in knit with parents"""
170
k = self.make_test_knit()
171
self.add_stock_one_and_one_a(k)
172
self.assertEquals(k.get_parents('text-1'), [])
173
self.assertEquals(k.get_parents('text-1a'), ['text-1'])
175
def test_ancestry(self):
176
"""Store in knit with parents"""
177
k = self.make_test_knit()
178
self.add_stock_one_and_one_a(k)
179
self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
181
def test_add_delta(self):
182
"""Store in knit with parents"""
183
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
184
delta=True, create=True)
185
self.add_stock_one_and_one_a(k)
187
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
189
def test_annotate(self):
191
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
192
delta=True, create=True)
193
self.insert_and_test_small_annotate(k)
195
def insert_and_test_small_annotate(self, k):
196
"""test annotation with k works correctly."""
197
k.add_lines('text-1', [], ['a\n', 'b\n'])
198
k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
200
origins = k.annotate('text-2')
201
self.assertEquals(origins[0], ('text-1', 'a\n'))
202
self.assertEquals(origins[1], ('text-2', 'c\n'))
204
def test_annotate_fulltext(self):
206
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
207
delta=False, create=True)
208
self.insert_and_test_small_annotate(k)
210
def test_annotate_merge_1(self):
211
k = self.make_test_knit(True)
212
k.add_lines('text-a1', [], ['a\n', 'b\n'])
213
k.add_lines('text-a2', [], ['d\n', 'c\n'])
214
k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
215
origins = k.annotate('text-am')
216
self.assertEquals(origins[0], ('text-a2', 'd\n'))
217
self.assertEquals(origins[1], ('text-a1', 'b\n'))
219
def test_annotate_merge_2(self):
220
k = self.make_test_knit(True)
221
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
222
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
223
k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
224
origins = k.annotate('text-am')
225
self.assertEquals(origins[0], ('text-a1', 'a\n'))
226
self.assertEquals(origins[1], ('text-a2', 'y\n'))
227
self.assertEquals(origins[2], ('text-a1', 'c\n'))
229
def test_annotate_merge_9(self):
230
k = self.make_test_knit(True)
231
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
232
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
233
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
234
origins = k.annotate('text-am')
235
self.assertEquals(origins[0], ('text-am', 'k\n'))
236
self.assertEquals(origins[1], ('text-a2', 'y\n'))
237
self.assertEquals(origins[2], ('text-a1', 'c\n'))
239
def test_annotate_merge_3(self):
240
k = self.make_test_knit(True)
241
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
242
k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
243
k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
244
origins = k.annotate('text-am')
245
self.assertEquals(origins[0], ('text-am', 'k\n'))
246
self.assertEquals(origins[1], ('text-a2', 'y\n'))
247
self.assertEquals(origins[2], ('text-a2', 'z\n'))
249
def test_annotate_merge_4(self):
250
k = self.make_test_knit(True)
251
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
252
k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
253
k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
254
k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
255
origins = k.annotate('text-am')
256
self.assertEquals(origins[0], ('text-a1', 'a\n'))
257
self.assertEquals(origins[1], ('text-a1', 'b\n'))
258
self.assertEquals(origins[2], ('text-a2', 'z\n'))
260
def test_annotate_merge_5(self):
261
k = self.make_test_knit(True)
262
k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
263
k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
264
k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
265
k.add_lines('text-am',
266
['text-a1', 'text-a2', 'text-a3'],
267
['a\n', 'e\n', 'z\n'])
268
origins = k.annotate('text-am')
269
self.assertEquals(origins[0], ('text-a1', 'a\n'))
270
self.assertEquals(origins[1], ('text-a2', 'e\n'))
271
self.assertEquals(origins[2], ('text-a3', 'z\n'))
273
def test_annotate_file_cherry_pick(self):
274
k = self.make_test_knit(True)
275
k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
276
k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
277
k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
278
origins = k.annotate('text-3')
279
self.assertEquals(origins[0], ('text-1', 'a\n'))
280
self.assertEquals(origins[1], ('text-1', 'b\n'))
281
self.assertEquals(origins[2], ('text-1', 'c\n'))
283
def test_knit_join(self):
284
"""Store in knit with parents"""
285
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
286
k1.add_lines('text-a', [], split_lines(TEXT_1))
287
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
289
k1.add_lines('text-c', [], split_lines(TEXT_1))
290
k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
292
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
294
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
295
count = k2.join(k1, version_ids=['text-m'])
296
self.assertEquals(count, 5)
297
self.assertTrue(k2.has_version('text-a'))
298
self.assertTrue(k2.has_version('text-c'))
300
def test_reannotate(self):
301
k1 = KnitVersionedFile('knit1', get_transport('.'),
302
factory=KnitAnnotateFactory(), create=True)
304
k1.add_lines('text-a', [], ['a\n', 'b\n'])
306
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
308
k2 = KnitVersionedFile('test2', get_transport('.'),
309
factory=KnitAnnotateFactory(), create=True)
310
k2.join(k1, version_ids=['text-b'])
313
k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
315
k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
317
k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
319
# test-c will have index 3
320
k1.join(k2, version_ids=['text-c'])
322
lines = k1.get_lines('text-c')
323
self.assertEquals(lines, ['z\n', 'c\n'])
325
origins = k1.annotate('text-c')
326
self.assertEquals(origins[0], ('text-c', 'z\n'))
327
self.assertEquals(origins[1], ('text-b', 'c\n'))
329
def test_get_line_delta_texts(self):
330
"""Make sure we can call get_texts on text with reused line deltas"""
331
k1 = KnitVersionedFile('test1', get_transport('.'),
332
factory=KnitPlainFactory(), create=True)
337
parents = ['%d' % (t-1)]
338
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
339
k1.get_texts(('%d' % t) for t in range(3))
341
def test_iter_lines_reads_in_order(self):
342
t = MemoryTransport()
343
instrumented_t = TransportLogger(t)
344
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
345
self.assertEqual([('id.kndx',)], instrumented_t._calls)
346
# add texts with no required ordering
347
k1.add_lines('base', [], ['text\n'])
348
k1.add_lines('base2', [], ['text2\n'])
350
instrumented_t._calls = []
351
# request a last-first iteration
352
results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
353
self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
354
self.assertEqual(['text\n', 'text2\n'], results)
356
def test_create_empty_annotated(self):
357
k1 = self.make_test_knit(True)
359
k1.add_lines('text-a', [], ['a\n', 'b\n'])
360
k2 = k1.create_empty('t', MemoryTransport())
361
self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
362
self.assertEqual(k1.delta, k2.delta)
363
# the generic test checks for empty content and file class
365
def test_knit_format(self):
366
# this tests that a new knit index file has the expected content
367
# and that is writes the data we expect as records are added.
368
knit = self.make_test_knit(True)
369
# Now knit files are not created until we first add data to them
370
self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
371
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
372
self.assertFileEqual(
373
"# bzr knit index 8\n"
375
"revid fulltext 0 84 .a_ghost :",
377
knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
378
self.assertFileEqual(
379
"# bzr knit index 8\n"
380
"\nrevid fulltext 0 84 .a_ghost :"
381
"\nrevid2 line-delta 84 82 0 :",
383
# we should be able to load this file again
384
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
385
self.assertEqual(['revid', 'revid2'], knit.versions())
386
# write a short write to the file and ensure that its ignored
387
indexfile = file('test.kndx', 'at')
388
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
390
# we should be able to load this file again
391
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
392
self.assertEqual(['revid', 'revid2'], knit.versions())
393
# and add a revision with the same id the failed write had
394
knit.add_lines('revid3', ['revid2'], ['a\n'])
395
# and when reading it revid3 should now appear.
396
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
397
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
398
self.assertEqual(['revid2'], knit.get_parents('revid3'))
400
def test_delay_create(self):
401
"""Test that passing delay_create=True creates files late"""
402
knit = self.make_test_knit(annotate=True, delay_create=True)
403
self.failIfExists('test.knit')
404
self.failIfExists('test.kndx')
405
knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
406
self.failUnlessExists('test.knit')
407
self.assertFileEqual(
408
"# bzr knit index 8\n"
410
"revid fulltext 0 84 .a_ghost :",
413
def test_create_parent_dir(self):
414
"""create_parent_dir can create knits in nonexistant dirs"""
415
# Has no effect if we don't set 'delay_create'
416
trans = get_transport('.')
417
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
418
trans, access_mode='w', factory=None,
419
create=True, create_parent_dir=True)
420
# Nothing should have changed yet
421
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
422
factory=None, create=True,
423
create_parent_dir=True,
425
self.failIfExists('dir/test.knit')
426
self.failIfExists('dir/test.kndx')
427
self.failIfExists('dir')
428
knit.add_lines('revid', [], ['a\n'])
429
self.failUnlessExists('dir')
430
self.failUnlessExists('dir/test.knit')
431
self.assertFileEqual(
432
"# bzr knit index 8\n"
434
"revid fulltext 0 84 :",
437
def test_create_mode_700(self):
438
trans = get_transport('.')
439
if not trans._can_roundtrip_unix_modebits():
440
# Can't roundtrip, so no need to run this test
442
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
443
factory=None, create=True,
444
create_parent_dir=True,
448
knit.add_lines('revid', [], ['a\n'])
449
self.assertTransportMode(trans, 'dir', 0700)
450
self.assertTransportMode(trans, 'dir/test.knit', 0600)
451
self.assertTransportMode(trans, 'dir/test.kndx', 0600)
453
def test_create_mode_770(self):
454
trans = get_transport('.')
455
if not trans._can_roundtrip_unix_modebits():
456
# Can't roundtrip, so no need to run this test
458
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
459
factory=None, create=True,
460
create_parent_dir=True,
464
knit.add_lines('revid', [], ['a\n'])
465
self.assertTransportMode(trans, 'dir', 0770)
466
self.assertTransportMode(trans, 'dir/test.knit', 0660)
467
self.assertTransportMode(trans, 'dir/test.kndx', 0660)
469
def test_create_mode_777(self):
470
trans = get_transport('.')
471
if not trans._can_roundtrip_unix_modebits():
472
# Can't roundtrip, so no need to run this test
474
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
475
factory=None, create=True,
476
create_parent_dir=True,
480
knit.add_lines('revid', [], ['a\n'])
481
self.assertTransportMode(trans, 'dir', 0777)
482
self.assertTransportMode(trans, 'dir/test.knit', 0666)
483
self.assertTransportMode(trans, 'dir/test.kndx', 0666)
485
def test_plan_merge(self):
486
my_knit = self.make_test_knit(annotate=True)
487
my_knit.add_lines('text1', [], split_lines(TEXT_1))
488
my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
489
my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
490
plan = list(my_knit.plan_merge('text1a', 'text1b'))
491
for plan_line, expected_line in zip(plan, AB_MERGE):
492
self.assertEqual(plan_line, expected_line)
504
Banana cup cake recipe
514
Banana cup cake recipe
516
- bananas (do not use plantains!!!)
523
Banana cup cake recipe
539
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
544
new-b|- bananas (do not use plantains!!!)
545
unchanged|- broken tea cups
546
new-a|- self-raising flour
549
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
552
def line_delta(from_lines, to_lines):
553
"""Generate line-based delta from one text to another"""
554
s = difflib.SequenceMatcher(None, from_lines, to_lines)
555
for op in s.get_opcodes():
558
yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
559
for i in range(op[3], op[4]):
563
def apply_line_delta(basis_lines, delta_lines):
564
"""Apply a line-based perfect diff
566
basis_lines -- text to apply the patch to
567
delta_lines -- diff instructions and content
572
while i < len(delta_lines):
574
a, b, c = map(long, l.split(','))
576
out[offset+a:offset+b] = delta_lines[i:i+c]
578
offset = offset + (b - a) + c
582
class TestWeaveToKnit(KnitTests):
584
def test_weave_to_knit_matches(self):
585
# check that the WeaveToKnit is_compatible function
586
# registers True for a Weave to a Knit.
588
k = self.make_test_knit()
589
self.failUnless(WeaveToKnit.is_compatible(w, k))
590
self.failIf(WeaveToKnit.is_compatible(k, w))
591
self.failIf(WeaveToKnit.is_compatible(w, w))
592
self.failIf(WeaveToKnit.is_compatible(k, k))
595
class TestKnitCaching(KnitTests):
597
def create_knit(self, cache_add=False):
598
k = self.make_test_knit(True)
602
k.add_lines('text-1', [], split_lines(TEXT_1))
603
k.add_lines('text-2', [], split_lines(TEXT_2))
606
def test_no_caching(self):
607
k = self.create_knit()
608
# Nothing should be cached without setting 'enable_cache'
609
self.assertEqual({}, k._data._cache)
611
def test_cache_add_and_clear(self):
612
k = self.create_knit(True)
614
self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
617
self.assertEqual({}, k._data._cache)
619
def test_cache_data_read_raw(self):
620
k = self.create_knit()
625
def read_one_raw(version):
626
pos_map = k._get_components_positions([version])
627
method, pos, size, next = pos_map[version]
628
lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
629
self.assertEqual(1, len(lst))
632
val = read_one_raw('text-1')
633
self.assertEqual({'text-1':val[1]}, k._data._cache)
636
# After clear, new reads are not cached
637
self.assertEqual({}, k._data._cache)
639
val2 = read_one_raw('text-1')
640
self.assertEqual(val, val2)
641
self.assertEqual({}, k._data._cache)
643
def test_cache_data_read(self):
644
k = self.create_knit()
646
def read_one(version):
647
pos_map = k._get_components_positions([version])
648
method, pos, size, next = pos_map[version]
649
lst = list(k._data.read_records_iter([(version, pos, size)]))
650
self.assertEqual(1, len(lst))
656
val = read_one('text-2')
657
self.assertEqual(['text-2'], k._data._cache.keys())
658
self.assertEqual('text-2', val[0])
659
content, digest = k._data._parse_record('text-2',
660
k._data._cache['text-2'])
661
self.assertEqual(content, val[1])
662
self.assertEqual(digest, val[2])
665
self.assertEqual({}, k._data._cache)
667
val2 = read_one('text-2')
668
self.assertEqual(val, val2)
669
self.assertEqual({}, k._data._cache)
671
def test_cache_read(self):
672
k = self.create_knit()
675
text = k.get_text('text-1')
676
self.assertEqual(TEXT_1, text)
677
self.assertEqual(['text-1'], k._data._cache.keys())
680
self.assertEqual({}, k._data._cache)
682
text = k.get_text('text-1')
683
self.assertEqual(TEXT_1, text)
684
self.assertEqual({}, k._data._cache)
687
class TestKnitIndex(KnitTests):
689
def test_add_versions_dictionary_compresses(self):
690
"""Adding versions to the index should update the lookup dict"""
691
knit = self.make_test_knit()
693
idx.add_version('a-1', ['fulltext'], 0, 0, [])
694
self.check_file_contents('test.kndx',
695
'# bzr knit index 8\n'
699
idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
700
('a-3', ['fulltext'], 0, 0, ['a-2']),
702
self.check_file_contents('test.kndx',
703
'# bzr knit index 8\n'
705
'a-1 fulltext 0 0 :\n'
706
'a-2 fulltext 0 0 0 :\n'
707
'a-3 fulltext 0 0 1 :'
709
self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
710
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
711
'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
712
'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
715
def test_add_versions_fails_clean(self):
716
"""If add_versions fails in the middle, it restores a pristine state.
718
Any modifications that are made to the index are reset if all versions
721
# This cheats a little bit by passing in a generator which will
722
# raise an exception before the processing finishes
723
# Other possibilities would be to have an version with the wrong number
724
# of entries, or to make the backing transport unable to write any
727
knit = self.make_test_knit()
729
idx.add_version('a-1', ['fulltext'], 0, 0, [])
731
class StopEarly(Exception):
734
def generate_failure():
735
"""Add some entries and then raise an exception"""
736
yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
737
yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
740
# Assert the pre-condition
741
self.assertEqual(['a-1'], idx._history)
742
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
744
self.assertRaises(StopEarly, idx.add_versions, generate_failure())
746
# And it shouldn't be modified
747
self.assertEqual(['a-1'], idx._history)
748
self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)