87
95
self.assertRaises(StopIteration, it.next)
98
class MockTransport(object):
100
def __init__(self, file_lines=None):
101
self.file_lines = file_lines
103
# We have no base directory for the MockTransport
106
def get(self, filename):
107
if self.file_lines is None:
108
raise NoSuchFile(filename)
110
return StringIO("\n".join(self.file_lines))
112
def __getattr__(self, name):
113
def queue_call(*args, **kwargs):
114
self.calls.append((name, args, kwargs))
118
class LowLevelKnitIndexTests(TestCase):
120
def test_no_such_file(self):
121
transport = MockTransport()
123
self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
124
self.assertRaises(NoSuchFile, _KnitIndex, transport,
125
"filename", "w", create=False)
127
def test_create_file(self):
128
transport = MockTransport()
130
index = _KnitIndex(transport, "filename", "w",
131
file_mode="wb", create=True)
133
("put_bytes_non_atomic",
134
("filename", index.HEADER), {"mode": "wb"}),
135
transport.calls.pop(0))
137
def test_delay_create_file(self):
138
transport = MockTransport()
140
index = _KnitIndex(transport, "filename", "w",
141
create=True, file_mode="wb", create_parent_dir=True,
142
delay_create=True, dir_mode=0777)
143
self.assertEqual([], transport.calls)
145
index.add_versions([])
146
name, (filename, f), kwargs = transport.calls.pop(0)
147
self.assertEqual("put_file_non_atomic", name)
149
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
151
self.assertEqual("filename", filename)
152
self.assertEqual(index.HEADER, f.read())
154
index.add_versions([])
155
self.assertEqual(("append_bytes", ("filename", ""), {}),
156
transport.calls.pop(0))
158
def test_read_utf8_version_id(self):
159
transport = MockTransport([
161
u"version-\N{CYRILLIC CAPITAL LETTER A}"
162
u" option 0 1 :".encode("utf-8")
164
index = _KnitIndex(transport, "filename", "r")
166
index.has_version(u"version-\N{CYRILLIC CAPITAL LETTER A}"))
168
def test_read_utf8_parents(self):
169
transport = MockTransport([
171
u"version option 0 1"
172
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")
174
index = _KnitIndex(transport, "filename", "r")
175
self.assertEqual([u"version-\N{CYRILLIC CAPITAL LETTER A}"],
176
index.get_parents_with_ghosts("version"))
178
def test_read_ignore_corrupted_lines(self):
179
transport = MockTransport([
182
"corrupted options 0 1 .b .c ",
183
"version options 0 1 :"
185
index = _KnitIndex(transport, "filename", "r")
186
self.assertEqual(1, index.num_versions())
187
self.assertTrue(index.has_version(u"version"))
189
def test_read_corrupted_header(self):
190
transport = MockTransport(['not a bzr knit index header\n'])
191
self.assertRaises(KnitHeaderError,
192
_KnitIndex, transport, "filename", "r")
194
def test_read_duplicate_entries(self):
195
transport = MockTransport([
197
"parent options 0 1 :",
198
"version options1 0 1 0 :",
199
"version options2 1 2 .other :",
200
"version options3 3 4 0 .other :"
202
index = _KnitIndex(transport, "filename", "r")
203
self.assertEqual(2, index.num_versions())
204
self.assertEqual(1, index.lookup(u"version"))
205
self.assertEqual((3, 4), index.get_position(u"version"))
206
self.assertEqual(["options3"], index.get_options(u"version"))
207
self.assertEqual([u"parent", u"other"],
208
index.get_parents_with_ghosts(u"version"))
210
def test_read_compressed_parents(self):
211
transport = MockTransport([
215
"c option 0 1 1 0 :",
217
index = _KnitIndex(transport, "filename", "r")
218
self.assertEqual([u"a"], index.get_parents(u"b"))
219
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
221
def test_write_utf8_version_id(self):
222
transport = MockTransport([
225
index = _KnitIndex(transport, "filename", "r")
226
index.add_version(u"version-\N{CYRILLIC CAPITAL LETTER A}",
227
["option"], 0, 1, [])
228
self.assertEqual(("append_bytes", ("filename",
229
u"\nversion-\N{CYRILLIC CAPITAL LETTER A}"
230
u" option 0 1 :".encode("utf-8")),
232
transport.calls.pop(0))
234
def test_write_utf8_parents(self):
235
transport = MockTransport([
238
index = _KnitIndex(transport, "filename", "r")
239
index.add_version(u"version", ["option"], 0, 1,
240
[u"version-\N{CYRILLIC CAPITAL LETTER A}"])
241
self.assertEqual(("append_bytes", ("filename",
242
u"\nversion option 0 1"
243
u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")),
245
transport.calls.pop(0))
247
def test_get_graph(self):
248
transport = MockTransport()
249
index = _KnitIndex(transport, "filename", "w", create=True)
250
self.assertEqual([], index.get_graph())
252
index.add_version(u"a", ["option"], 0, 1, [u"b"])
253
self.assertEqual([(u"a", [u"b"])], index.get_graph())
255
index.add_version(u"c", ["option"], 0, 1, [u"d"])
256
self.assertEqual([(u"a", [u"b"]), (u"c", [u"d"])],
257
sorted(index.get_graph()))
259
def test_get_ancestry(self):
260
transport = MockTransport([
263
"b option 0 1 0 .e :",
264
"c option 0 1 1 0 :",
265
"d option 0 1 2 .f :"
267
index = _KnitIndex(transport, "filename", "r")
269
self.assertEqual([], index.get_ancestry([]))
270
self.assertEqual([u"a"], index.get_ancestry([u"a"]))
271
self.assertEqual([u"a", u"b"], index.get_ancestry([u"b"]))
272
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"c"]))
273
self.assertEqual([u"a", u"b", u"c", u"d"], index.get_ancestry([u"d"]))
274
self.assertEqual([u"a", u"b"], index.get_ancestry([u"a", u"b"]))
275
self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"a", u"c"]))
277
self.assertRaises(RevisionNotPresent, index.get_ancestry, [u"e"])
279
def test_get_ancestry_with_ghosts(self):
280
transport = MockTransport([
283
"b option 0 1 0 .e :",
284
"c option 0 1 0 .f .g :",
285
"d option 0 1 2 .h .j .k :"
287
index = _KnitIndex(transport, "filename", "r")
289
self.assertEqual([], index.get_ancestry_with_ghosts([]))
290
self.assertEqual([u"a"], index.get_ancestry_with_ghosts([u"a"]))
291
self.assertEqual([u"a", u"e", u"b"],
292
index.get_ancestry_with_ghosts([u"b"]))
293
self.assertEqual([u"a", u"g", u"f", u"c"],
294
index.get_ancestry_with_ghosts([u"c"]))
295
self.assertEqual([u"a", u"g", u"f", u"c", u"k", u"j", u"h", u"d"],
296
index.get_ancestry_with_ghosts([u"d"]))
297
self.assertEqual([u"a", u"e", u"b"],
298
index.get_ancestry_with_ghosts([u"a", u"b"]))
299
self.assertEqual([u"a", u"g", u"f", u"c"],
300
index.get_ancestry_with_ghosts([u"a", u"c"]))
302
[u"a", u"g", u"f", u"c", u"e", u"b", u"k", u"j", u"h", u"d"],
303
index.get_ancestry_with_ghosts([u"b", u"d"]))
305
self.assertRaises(RevisionNotPresent,
306
index.get_ancestry_with_ghosts, [u"e"])
308
def test_num_versions(self):
309
transport = MockTransport([
312
index = _KnitIndex(transport, "filename", "r")
314
self.assertEqual(0, index.num_versions())
315
self.assertEqual(0, len(index))
317
index.add_version(u"a", ["option"], 0, 1, [])
318
self.assertEqual(1, index.num_versions())
319
self.assertEqual(1, len(index))
321
index.add_version(u"a", ["option2"], 1, 2, [])
322
self.assertEqual(1, index.num_versions())
323
self.assertEqual(1, len(index))
325
index.add_version(u"b", ["option"], 0, 1, [])
326
self.assertEqual(2, index.num_versions())
327
self.assertEqual(2, len(index))
329
def test_get_versions(self):
330
transport = MockTransport([
333
index = _KnitIndex(transport, "filename", "r")
335
self.assertEqual([], index.get_versions())
337
index.add_version(u"a", ["option"], 0, 1, [])
338
self.assertEqual([u"a"], index.get_versions())
340
index.add_version(u"a", ["option"], 0, 1, [])
341
self.assertEqual([u"a"], index.get_versions())
343
index.add_version(u"b", ["option"], 0, 1, [])
344
self.assertEqual([u"a", u"b"], index.get_versions())
346
def test_idx_to_name(self):
347
transport = MockTransport([
352
index = _KnitIndex(transport, "filename", "r")
354
self.assertEqual(u"a", index.idx_to_name(0))
355
self.assertEqual(u"b", index.idx_to_name(1))
356
self.assertEqual(u"b", index.idx_to_name(-1))
357
self.assertEqual(u"a", index.idx_to_name(-2))
359
def test_lookup(self):
360
transport = MockTransport([
365
index = _KnitIndex(transport, "filename", "r")
367
self.assertEqual(0, index.lookup(u"a"))
368
self.assertEqual(1, index.lookup(u"b"))
370
def test_add_version(self):
371
transport = MockTransport([
374
index = _KnitIndex(transport, "filename", "r")
376
index.add_version(u"a", ["option"], 0, 1, [u"b"])
377
self.assertEqual(("append_bytes",
378
("filename", "\na option 0 1 .b :"),
379
{}), transport.calls.pop(0))
380
self.assertTrue(index.has_version(u"a"))
381
self.assertEqual(1, index.num_versions())
382
self.assertEqual((0, 1), index.get_position(u"a"))
383
self.assertEqual(["option"], index.get_options(u"a"))
384
self.assertEqual([u"b"], index.get_parents_with_ghosts(u"a"))
386
index.add_version(u"a", ["opt"], 1, 2, [u"c"])
387
self.assertEqual(("append_bytes",
388
("filename", "\na opt 1 2 .c :"),
389
{}), transport.calls.pop(0))
390
self.assertTrue(index.has_version(u"a"))
391
self.assertEqual(1, index.num_versions())
392
self.assertEqual((1, 2), index.get_position(u"a"))
393
self.assertEqual(["opt"], index.get_options(u"a"))
394
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
396
index.add_version(u"b", ["option"], 2, 3, [u"a"])
397
self.assertEqual(("append_bytes",
398
("filename", "\nb option 2 3 0 :"),
399
{}), transport.calls.pop(0))
400
self.assertTrue(index.has_version(u"b"))
401
self.assertEqual(2, index.num_versions())
402
self.assertEqual((2, 3), index.get_position(u"b"))
403
self.assertEqual(["option"], index.get_options(u"b"))
404
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
406
def test_add_versions(self):
407
transport = MockTransport([
410
index = _KnitIndex(transport, "filename", "r")
413
(u"a", ["option"], 0, 1, [u"b"]),
414
(u"a", ["opt"], 1, 2, [u"c"]),
415
(u"b", ["option"], 2, 3, [u"a"])
417
self.assertEqual(("append_bytes", ("filename",
418
"\na option 0 1 .b :"
421
), {}), transport.calls.pop(0))
422
self.assertTrue(index.has_version(u"a"))
423
self.assertTrue(index.has_version(u"b"))
424
self.assertEqual(2, index.num_versions())
425
self.assertEqual((1, 2), index.get_position(u"a"))
426
self.assertEqual((2, 3), index.get_position(u"b"))
427
self.assertEqual(["opt"], index.get_options(u"a"))
428
self.assertEqual(["option"], index.get_options(u"b"))
429
self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
430
self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
432
def test_delay_create_and_add_versions(self):
433
transport = MockTransport()
435
index = _KnitIndex(transport, "filename", "w",
436
create=True, file_mode="wb", create_parent_dir=True,
437
delay_create=True, dir_mode=0777)
438
self.assertEqual([], transport.calls)
441
(u"a", ["option"], 0, 1, [u"b"]),
442
(u"a", ["opt"], 1, 2, [u"c"]),
443
(u"b", ["option"], 2, 3, [u"a"])
445
name, (filename, f), kwargs = transport.calls.pop(0)
446
self.assertEqual("put_file_non_atomic", name)
448
{"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
450
self.assertEqual("filename", filename)
453
"\na option 0 1 .b :"
455
"\nb option 2 3 0 :",
458
def test_has_version(self):
459
transport = MockTransport([
463
index = _KnitIndex(transport, "filename", "r")
465
self.assertTrue(index.has_version(u"a"))
466
self.assertFalse(index.has_version(u"b"))
468
def test_get_position(self):
469
transport = MockTransport([
474
index = _KnitIndex(transport, "filename", "r")
476
self.assertEqual((0, 1), index.get_position(u"a"))
477
self.assertEqual((1, 2), index.get_position(u"b"))
479
def test_get_method(self):
480
transport = MockTransport([
482
"a fulltext,unknown 0 1 :",
483
"b unknown,line-delta 1 2 :",
486
index = _KnitIndex(transport, "filename", "r")
488
self.assertEqual("fulltext", index.get_method(u"a"))
489
self.assertEqual("line-delta", index.get_method(u"b"))
490
self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, u"c")
492
def test_get_options(self):
493
transport = MockTransport([
498
index = _KnitIndex(transport, "filename", "r")
500
self.assertEqual(["opt1"], index.get_options(u"a"))
501
self.assertEqual(["opt2", "opt3"], index.get_options(u"b"))
503
def test_get_parents(self):
504
transport = MockTransport([
507
"b option 1 2 0 .c :",
508
"c option 1 2 1 0 .e :"
510
index = _KnitIndex(transport, "filename", "r")
512
self.assertEqual([], index.get_parents(u"a"))
513
self.assertEqual([u"a", u"c"], index.get_parents(u"b"))
514
self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
516
def test_get_parents_with_ghosts(self):
517
transport = MockTransport([
520
"b option 1 2 0 .c :",
521
"c option 1 2 1 0 .e :"
523
index = _KnitIndex(transport, "filename", "r")
525
self.assertEqual([], index.get_parents_with_ghosts(u"a"))
526
self.assertEqual([u"a", u"c"], index.get_parents_with_ghosts(u"b"))
527
self.assertEqual([u"b", u"a", u"e"],
528
index.get_parents_with_ghosts(u"c"))
530
def test_check_versions_present(self):
531
transport = MockTransport([
536
index = _KnitIndex(transport, "filename", "r")
538
check = index.check_versions_present
544
self.assertRaises(RevisionNotPresent, check, [u"c"])
545
self.assertRaises(RevisionNotPresent, check, [u"a", u"b", u"c"])
90
548
class KnitTests(TestCaseWithTransport):
91
549
"""Class containing knit test helper routines."""