492
495
self.addCleanup(reset)
493
496
from bzrlib._knit_load_data_py import _load_data_py
494
497
knit._load_data = _load_data_py
495
return _KnitIndex(*args, **kwargs)
498
return _KnitIndex(get_scope=lambda:None, *args, **kwargs)
497
500
def test_no_such_file(self):
498
501
transport = MockTransport()
1075
1078
self.addCleanup(reset)
1076
1079
from bzrlib._knit_load_data_c import _load_data_c
1077
1080
knit._load_data = _load_data_c
1078
return _KnitIndex(*args, **kwargs)
1081
return _KnitIndex(get_scope=lambda:None, *args, **kwargs)
1082
1084
class KnitTests(TestCaseWithTransport):
1083
1085
"""Class containing knit test helper routines."""
1085
1087
def make_test_knit(self, annotate=False, delay_create=False, index=None,
1088
name='test', delta=True, access_mode='w'):
1087
1089
if not annotate:
1088
1090
factory = KnitPlainFactory()
1091
return KnitVersionedFile(name, get_transport('.'), access_mode='w',
1092
factory=factory, create=True,
1093
delay_create=delay_create, index=index)
1094
index = _KnitIndex(get_transport('.'), name + INDEX_SUFFIX,
1095
access_mode, create=True, file_mode=None,
1096
create_parent_dir=False, delay_create=delay_create,
1097
dir_mode=None, get_scope=lambda:None)
1098
access = _KnitAccess(get_transport('.'), name + DATA_SUFFIX, None,
1099
None, delay_create, False)
1100
return KnitVersionedFile(name, get_transport('.'), factory=factory,
1101
create=True, delay_create=delay_create, index=index,
1102
access_method=access, delta=delta)
1095
1104
def assertRecordContentEqual(self, knit, version_id, candidate_content):
1096
1105
"""Assert that some raw record content matches the raw record content
1115
1124
def test_make_explicit_index(self):
1116
1125
"""We can supply an index to use."""
1117
1126
knit = KnitVersionedFile('test', get_transport('.'),
1118
index='strangelove')
1127
index='strangelove', access_method="a")
1119
1128
self.assertEqual(knit._index, 'strangelove')
1121
1130
def test_knit_add(self):
1162
1171
k = self.make_test_knit()
1163
1172
k.add_lines('text-1', [], split_lines(TEXT_1))
1165
k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
1174
k2 = make_file_knit('test', get_transport('.'), access_mode='r',
1175
factory=KnitPlainFactory(), create=True)
1166
1176
self.assertTrue(k2.has_version('text-1'))
1167
1177
self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
1190
1200
def test_incomplete(self):
1191
1201
"""Test if texts without a ending line-end can be inserted and
1193
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
1203
k = make_file_knit('test', get_transport('.'), delta=False, create=True)
1194
1204
k.add_lines('text-1', [], ['a\n', 'b' ])
1195
1205
k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
1196
1206
# reopening ensures maximum room for confusion
1197
k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
1207
k = make_file_knit('test', get_transport('.'), delta=False, create=True)
1198
1208
self.assertEquals(k.get_lines('text-1'), ['a\n', 'b' ])
1199
1209
self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
1223
1233
def test_add_delta(self):
1224
1234
"""Store in knit with parents"""
1225
k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
1226
delta=True, create=True)
1235
k = self.make_test_knit(annotate=False)
1227
1236
self.add_stock_one_and_one_a(k)
1228
1237
k.clear_cache()
1229
1238
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
1233
1242
index = InMemoryGraphIndex(2)
1234
1243
knit_index = KnitGraphIndex(index, add_callback=index.add_nodes,
1236
k = KnitVersionedFile('test', get_transport('.'),
1237
delta=True, create=True, index=knit_index)
1245
k = self.make_test_knit(annotate=True, index=knit_index)
1238
1246
self.add_stock_one_and_one_a(k)
1239
1247
k.clear_cache()
1240
1248
self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
1249
1257
def test_annotate(self):
1250
1258
"""Annotations"""
1251
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1252
delta=True, create=True)
1259
k = self.make_test_knit(annotate=True, name='knit')
1253
1260
self.insert_and_test_small_annotate(k)
1255
1262
def insert_and_test_small_annotate(self, k):
1264
1271
def test_annotate_fulltext(self):
1265
1272
"""Annotations"""
1266
k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1267
delta=False, create=True)
1273
k = self.make_test_knit(annotate=True, name='knit', delta=False)
1268
1274
self.insert_and_test_small_annotate(k)
1270
1276
def test_annotate_merge_1(self):
1341
1347
self.assertEquals(origins[2], ('text-1', 'c\n'))
1343
1349
def _test_join_with_factories(self, k1_factory, k2_factory):
1344
k1 = KnitVersionedFile('test1', get_transport('.'), factory=k1_factory, create=True)
1350
k1 = make_file_knit('test1', get_transport('.'), factory=k1_factory, create=True)
1345
1351
k1.add_lines('text-a', [], ['a1\n', 'a2\n', 'a3\n'])
1346
1352
k1.add_lines('text-b', ['text-a'], ['a1\n', 'b2\n', 'a3\n'])
1347
1353
k1.add_lines('text-c', [], ['c1\n', 'c2\n', 'c3\n'])
1348
1354
k1.add_lines('text-d', ['text-c'], ['c1\n', 'd2\n', 'd3\n'])
1349
1355
k1.add_lines('text-m', ['text-b', 'text-d'], ['a1\n', 'b2\n', 'd3\n'])
1350
k2 = KnitVersionedFile('test2', get_transport('.'), factory=k2_factory, create=True)
1356
k2 = make_file_knit('test2', get_transport('.'), factory=k2_factory, create=True)
1351
1357
count = k2.join(k1, version_ids=['text-m'])
1352
1358
self.assertEquals(count, 5)
1353
1359
self.assertTrue(k2.has_version('text-a'))
1374
1380
self._test_join_with_factories(KnitPlainFactory(), None)
1376
1382
def test_reannotate(self):
1377
k1 = KnitVersionedFile('knit1', get_transport('.'),
1383
k1 = make_file_knit('knit1', get_transport('.'),
1378
1384
factory=KnitAnnotateFactory(), create=True)
1380
1386
k1.add_lines('text-a', [], ['a\n', 'b\n'])
1382
1388
k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
1384
k2 = KnitVersionedFile('test2', get_transport('.'),
1390
k2 = make_file_knit('test2', get_transport('.'),
1385
1391
factory=KnitAnnotateFactory(), create=True)
1386
1392
k2.join(k1, version_ids=['text-b'])
1405
1411
def test_get_line_delta_texts(self):
1406
1412
"""Make sure we can call get_texts on text with reused line deltas"""
1407
k1 = KnitVersionedFile('test1', get_transport('.'),
1408
factory=KnitPlainFactory(), create=True)
1413
k1 = make_file_knit('test1', get_transport('.'),
1414
factory=KnitPlainFactory(), create=True)
1409
1415
for t in range(3):
1417
1423
def test_iter_lines_reads_in_order(self):
1418
1424
instrumented_t = get_transport('trace+memory:///')
1419
k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
1425
k1 = make_file_knit('id', instrumented_t, create=True, delta=True)
1420
1426
self.assertEqual([('get', 'id.kndx',)], instrumented_t._activity)
1421
1427
# add texts with no required ordering
1422
1428
k1.add_lines('base', [], ['text\n'])
1452
1458
"\nrevid2 line-delta 84 82 0 :",
1454
1460
# we should be able to load this file again
1455
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1461
knit = make_file_knit('test', get_transport('.'), access_mode='r')
1456
1462
self.assertEqual(['revid', 'revid2'], knit.versions())
1457
1463
# write a short write to the file and ensure that its ignored
1458
1464
indexfile = file('test.kndx', 'ab')
1459
1465
indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
1460
1466
indexfile.close()
1461
1467
# we should be able to load this file again
1462
knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
1468
knit = make_file_knit('test', get_transport('.'), access_mode='w')
1463
1469
self.assertEqual(['revid', 'revid2'], knit.versions())
1464
1470
# and add a revision with the same id the failed write had
1465
1471
knit.add_lines('revid3', ['revid2'], ['a\n'])
1466
1472
# and when reading it revid3 should now appear.
1467
knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
1473
knit = make_file_knit('test', get_transport('.'), access_mode='r')
1468
1474
self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
1469
1475
self.assertEqual({'revid3':('revid2',)}, knit.get_parent_map(['revid3']))
1485
1491
"""create_parent_dir can create knits in nonexistant dirs"""
1486
1492
# Has no effect if we don't set 'delay_create'
1487
1493
trans = get_transport('.')
1488
self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
1494
self.assertRaises(NoSuchFile, make_file_knit, 'dir/test',
1489
1495
trans, access_mode='w', factory=None,
1490
1496
create=True, create_parent_dir=True)
1491
1497
# Nothing should have changed yet
1492
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1498
knit = make_file_knit('dir/test', trans, access_mode='w',
1493
1499
factory=None, create=True,
1494
1500
create_parent_dir=True,
1495
1501
delay_create=True)
1510
1516
if not trans._can_roundtrip_unix_modebits():
1511
1517
# Can't roundtrip, so no need to run this test
1513
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1514
factory=None, create=True,
1515
create_parent_dir=True,
1519
knit = make_file_knit('dir/test', trans, access_mode='w', factory=None,
1520
create=True, create_parent_dir=True, delay_create=True,
1521
file_mode=0600, dir_mode=0700)
1519
1522
knit.add_lines('revid', [], ['a\n'])
1520
1523
self.assertTransportMode(trans, 'dir', 0700)
1521
1524
self.assertTransportMode(trans, 'dir/test.knit', 0600)
1526
1529
if not trans._can_roundtrip_unix_modebits():
1527
1530
# Can't roundtrip, so no need to run this test
1529
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1530
factory=None, create=True,
1531
create_parent_dir=True,
1532
knit = make_file_knit('dir/test', trans, access_mode='w', factory=None,
1533
create=True, create_parent_dir=True, delay_create=True,
1534
file_mode=0660, dir_mode=0770)
1535
1535
knit.add_lines('revid', [], ['a\n'])
1536
1536
self.assertTransportMode(trans, 'dir', 0770)
1537
1537
self.assertTransportMode(trans, 'dir/test.knit', 0660)
1542
1542
if not trans._can_roundtrip_unix_modebits():
1543
1543
# Can't roundtrip, so no need to run this test
1545
knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1546
factory=None, create=True,
1547
create_parent_dir=True,
1545
knit = make_file_knit('dir/test', trans, access_mode='w', factory=None,
1546
create=True, create_parent_dir=True, delay_create=True,
1547
file_mode=0666, dir_mode=0777)
1551
1548
knit.add_lines('revid', [], ['a\n'])
1552
1549
self.assertTransportMode(trans, 'dir', 0777)
1553
1550
self.assertTransportMode(trans, 'dir/test.knit', 0666)
2056
2053
def test_weave_to_knit_matches(self):
2057
2054
# check that the WeaveToKnit is_compatible function
2058
2055
# registers True for a Weave to a Knit.
2056
w = Weave(get_scope=lambda:None)
2060
2057
k = self.make_test_knit()
2061
2058
self.failUnless(WeaveToKnit.is_compatible(w, k))
2062
2059
self.failIf(WeaveToKnit.is_compatible(k, w))
2981
2978
knit.get_data_stream([]))
2982
2979
self.assertRaises(errors.KnitCorrupt,
2983
2980
list, access.get_raw_records([(True, "A", None, None)]))
2983
class TestFormatSignatures(KnitTests):
2985
def test_knit_format_signatures(self):
2986
"""Different formats of knit have different signature strings."""
2987
knit = self.make_test_knit(name='a', annotate=True)
2988
self.assertEqual('knit-annotated', knit.get_format_signature())
2989
knit = self.make_test_knit(name='p', annotate=False)
2990
self.assertEqual('knit-plain', knit.get_format_signature())