29
29
from bzrlib.osutils import split_lines
30
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.transport import TransportLogger
32
from bzrlib.transport.local import LocalTransport
31
from bzrlib.transport import TransportLogger, get_transport
33
32
from bzrlib.transport.memory import MemoryTransport
34
33
from bzrlib.weave import Weave
42
41
factory = KnitPlainFactory()
45
return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
44
return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
48
47
class BasicKnitTests(KnitTests):
67
66
k = self.make_test_knit()
68
67
k.add_lines('text-1', [], split_lines(TEXT_1))
70
k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
69
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
71
70
self.assertTrue(k2.has_version('text-1'))
72
71
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
95
94
def test_incomplete(self):
96
95
"""Test if texts without a ending line-end can be inserted and
98
k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
97
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
99
98
k.add_lines('text-1', [], ['a\n', 'b' ])
100
99
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
101
100
# reopening ensures maximum room for confusion
102
k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
101
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
103
102
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
104
103
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
128
127
def test_add_delta(self):
129
128
"""Store in knit with parents"""
130
k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
129
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
131
130
delta=True, create=True)
132
131
self.add_stock_one_and_one_a(k)
151
150
def test_annotate_fulltext(self):
152
151
"""Annotations"""
153
k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
152
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
154
153
delta=False, create=True)
155
154
self.insert_and_test_small_annotate(k)
230
229
def test_knit_join(self):
231
230
"""Store in knit with parents"""
232
k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
231
k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
233
232
k1.add_lines('text-a', [], split_lines(TEXT_1))
234
233
k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
239
238
k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
241
k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
240
k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
242
241
count = k2.join(k1, version_ids=['text-m'])
243
242
self.assertEquals(count, 5)
244
243
self.assertTrue(k2.has_version('text-a'))
245
244
self.assertTrue(k2.has_version('text-c'))
247
246
def test_reannotate(self):
248
k1 = KnitVersionedFile('knit1', LocalTransport('.'),
247
k1 = KnitVersionedFile('knit1', get_transport('.'),
249
248
factory=KnitAnnotateFactory(), create=True)
251
250
k1.add_lines('text-a', [], ['a\n', 'b\n'])
253
252
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
255
k2 = KnitVersionedFile('test2', LocalTransport('.'),
254
k2 = KnitVersionedFile('test2', get_transport('.'),
256
255
factory=KnitAnnotateFactory(), create=True)
257
256
k2.join(k1, version_ids=['text-b'])
273
272
self.assertEquals(origins[0], ('text-c', 'z\n'))
274
273
self.assertEquals(origins[1], ('text-b', 'c\n'))
276
def test_extraction_reads_components_once(self):
277
t = MemoryTransport()
278
instrumented_t = TransportLogger(t)
279
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
280
# should read the index
281
self.assertEqual([('id.kndx',)], instrumented_t._calls)
282
instrumented_t._calls = []
284
k1.add_lines('base', [], ['text\n'])
285
# should not have read at all
286
self.assertEqual([], instrumented_t._calls)
289
k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
290
# should not have read at all
291
self.assertEqual([], instrumented_t._calls)
295
# should not have read at all
296
self.assertEqual([], instrumented_t._calls)
303
# should have read a component
304
# should not have read the first component only
305
self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
306
instrumented_t._calls = []
309
# should not have read at all
310
self.assertEqual([], instrumented_t._calls)
311
# and now read the other component
313
# should have read the second component
314
self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
315
instrumented_t._calls = []
320
k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
321
# should read the first component only
322
self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
275
def test_get_line_delta_texts(self):
276
"""Make sure we can call get_texts on text with reused line deltas"""
277
k1 = KnitVersionedFile('test1', get_transport('.'),
278
factory=KnitPlainFactory(), create=True)
283
parents = ['%d' % (t-1)]
284
k1.add_lines('%d' % t, parents, ['hello\n'] * t)
285
k1.get_texts(('%d' % t) for t in range(3))
324
287
def test_iter_lines_reads_in_order(self):
325
288
t = MemoryTransport()
363
326
"\nrevid2 line-delta 84 82 0 :",
365
328
# we should be able to load this file again
366
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
329
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
367
330
self.assertEqual(['revid', 'revid2'], knit.versions())
368
331
# write a short write to the file and ensure that its ignored
369
332
indexfile = file('test.kndx', 'at')
370
333
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
371
334
indexfile.close()
372
335
# we should be able to load this file again
373
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='w')
336
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
374
337
self.assertEqual(['revid', 'revid2'], knit.versions())
375
338
# and add a revision with the same id the failed write had
376
339
knit.add_lines('revid3', ['revid2'], ['a\n'])
377
340
# and when reading it revid3 should now appear.
378
knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
341
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
379
342
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
380
343
self.assertEqual(['revid2'], knit.get_parents('revid3'))