/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

Merge from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Tests for Knit data structure"""
18
18
 
19
 
 
 
19
from cStringIO import StringIO
20
20
import difflib
21
21
 
22
 
 
23
 
from bzrlib import errors
24
 
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
 
22
from bzrlib import (
 
23
    errors,
 
24
    )
 
25
from bzrlib.errors import (
 
26
    RevisionAlreadyPresent,
 
27
    KnitHeaderError,
 
28
    RevisionNotPresent,
 
29
    NoSuchFile,
 
30
    )
25
31
from bzrlib.knit import (
26
32
    KnitContent,
27
33
    KnitVersionedFile,
28
34
    KnitPlainFactory,
29
35
    KnitAnnotateFactory,
30
 
    WeaveToKnit)
 
36
    _KnitIndex,
 
37
    WeaveToKnit,
 
38
    )
31
39
from bzrlib.osutils import split_lines
32
40
from bzrlib.tests import TestCase, TestCaseWithTransport
33
41
from bzrlib.transport import TransportLogger, get_transport
87
95
        self.assertRaises(StopIteration, it.next)
88
96
 
89
97
 
 
98
class MockTransport(object):
 
99
 
 
100
    def __init__(self, file_lines=None):
 
101
        self.file_lines = file_lines
 
102
        self.calls = []
 
103
        # We have no base directory for the MockTransport
 
104
        self.base = ''
 
105
 
 
106
    def get(self, filename):
 
107
        if self.file_lines is None:
 
108
            raise NoSuchFile(filename)
 
109
        else:
 
110
            return StringIO("\n".join(self.file_lines))
 
111
 
 
112
    def __getattr__(self, name):
 
113
        def queue_call(*args, **kwargs):
 
114
            self.calls.append((name, args, kwargs))
 
115
        return queue_call
 
116
 
 
117
 
 
118
class LowLevelKnitIndexTests(TestCase):
 
119
 
 
120
    def test_no_such_file(self):
 
121
        transport = MockTransport()
 
122
 
 
123
        self.assertRaises(NoSuchFile, _KnitIndex, transport, "filename", "r")
 
124
        self.assertRaises(NoSuchFile, _KnitIndex, transport,
 
125
            "filename", "w", create=False)
 
126
 
 
127
    def test_create_file(self):
 
128
        transport = MockTransport()
 
129
 
 
130
        index = _KnitIndex(transport, "filename", "w",
 
131
            file_mode="wb", create=True)
 
132
        self.assertEqual(
 
133
                ("put_bytes_non_atomic",
 
134
                    ("filename", index.HEADER), {"mode": "wb"}),
 
135
                transport.calls.pop(0))
 
136
 
 
137
    def test_delay_create_file(self):
 
138
        transport = MockTransport()
 
139
 
 
140
        index = _KnitIndex(transport, "filename", "w",
 
141
            create=True, file_mode="wb", create_parent_dir=True,
 
142
            delay_create=True, dir_mode=0777)
 
143
        self.assertEqual([], transport.calls)
 
144
 
 
145
        index.add_versions([])
 
146
        name, (filename, f), kwargs = transport.calls.pop(0)
 
147
        self.assertEqual("put_file_non_atomic", name)
 
148
        self.assertEqual(
 
149
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
150
            kwargs)
 
151
        self.assertEqual("filename", filename)
 
152
        self.assertEqual(index.HEADER, f.read())
 
153
 
 
154
        index.add_versions([])
 
155
        self.assertEqual(("append_bytes", ("filename", ""), {}),
 
156
            transport.calls.pop(0))
 
157
 
 
158
    def test_read_utf8_version_id(self):
 
159
        transport = MockTransport([
 
160
            _KnitIndex.HEADER,
 
161
            u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
162
                u" option 0 1 :".encode("utf-8")
 
163
            ])
 
164
        index = _KnitIndex(transport, "filename", "r")
 
165
        self.assertTrue(
 
166
            index.has_version(u"version-\N{CYRILLIC CAPITAL LETTER A}"))
 
167
 
 
168
    def test_read_utf8_parents(self):
 
169
        transport = MockTransport([
 
170
            _KnitIndex.HEADER,
 
171
            u"version option 0 1"
 
172
                u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")
 
173
            ])
 
174
        index = _KnitIndex(transport, "filename", "r")
 
175
        self.assertEqual([u"version-\N{CYRILLIC CAPITAL LETTER A}"],
 
176
            index.get_parents_with_ghosts("version"))
 
177
 
 
178
    def test_read_ignore_corrupted_lines(self):
 
179
        transport = MockTransport([
 
180
            _KnitIndex.HEADER,
 
181
            "corrupted",
 
182
            "corrupted options 0 1 .b .c ",
 
183
            "version options 0 1 :"
 
184
            ])
 
185
        index = _KnitIndex(transport, "filename", "r")
 
186
        self.assertEqual(1, index.num_versions())
 
187
        self.assertTrue(index.has_version(u"version"))
 
188
 
 
189
    def test_read_corrupted_header(self):
 
190
        transport = MockTransport(['not a bzr knit index header\n'])
 
191
        self.assertRaises(KnitHeaderError,
 
192
            _KnitIndex, transport, "filename", "r")
 
193
 
 
194
    def test_read_duplicate_entries(self):
 
195
        transport = MockTransport([
 
196
            _KnitIndex.HEADER,
 
197
            "parent options 0 1 :",
 
198
            "version options1 0 1 0 :",
 
199
            "version options2 1 2 .other :",
 
200
            "version options3 3 4 0 .other :"
 
201
            ])
 
202
        index = _KnitIndex(transport, "filename", "r")
 
203
        self.assertEqual(2, index.num_versions())
 
204
        self.assertEqual(1, index.lookup(u"version"))
 
205
        self.assertEqual((3, 4), index.get_position(u"version"))
 
206
        self.assertEqual(["options3"], index.get_options(u"version"))
 
207
        self.assertEqual([u"parent", u"other"],
 
208
            index.get_parents_with_ghosts(u"version"))
 
209
 
 
210
    def test_read_compressed_parents(self):
 
211
        transport = MockTransport([
 
212
            _KnitIndex.HEADER,
 
213
            "a option 0 1 :",
 
214
            "b option 0 1 0 :",
 
215
            "c option 0 1 1 0 :",
 
216
            ])
 
217
        index = _KnitIndex(transport, "filename", "r")
 
218
        self.assertEqual([u"a"], index.get_parents(u"b"))
 
219
        self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
 
220
 
 
221
    def test_write_utf8_version_id(self):
 
222
        transport = MockTransport([
 
223
            _KnitIndex.HEADER
 
224
            ])
 
225
        index = _KnitIndex(transport, "filename", "r")
 
226
        index.add_version(u"version-\N{CYRILLIC CAPITAL LETTER A}",
 
227
            ["option"], 0, 1, [])
 
228
        self.assertEqual(("append_bytes", ("filename",
 
229
            u"\nversion-\N{CYRILLIC CAPITAL LETTER A}"
 
230
                u" option 0 1  :".encode("utf-8")),
 
231
            {}),
 
232
            transport.calls.pop(0))
 
233
 
 
234
    def test_write_utf8_parents(self):
 
235
        transport = MockTransport([
 
236
            _KnitIndex.HEADER
 
237
            ])
 
238
        index = _KnitIndex(transport, "filename", "r")
 
239
        index.add_version(u"version", ["option"], 0, 1,
 
240
            [u"version-\N{CYRILLIC CAPITAL LETTER A}"])
 
241
        self.assertEqual(("append_bytes", ("filename",
 
242
            u"\nversion option 0 1"
 
243
                u" .version-\N{CYRILLIC CAPITAL LETTER A} :".encode("utf-8")),
 
244
            {}),
 
245
            transport.calls.pop(0))
 
246
 
 
247
    def test_get_graph(self):
 
248
        transport = MockTransport()
 
249
        index = _KnitIndex(transport, "filename", "w", create=True)
 
250
        self.assertEqual([], index.get_graph())
 
251
 
 
252
        index.add_version(u"a", ["option"], 0, 1, [u"b"])
 
253
        self.assertEqual([(u"a", [u"b"])], index.get_graph())
 
254
 
 
255
        index.add_version(u"c", ["option"], 0, 1, [u"d"])
 
256
        self.assertEqual([(u"a", [u"b"]), (u"c", [u"d"])],
 
257
            sorted(index.get_graph()))
 
258
 
 
259
    def test_get_ancestry(self):
 
260
        transport = MockTransport([
 
261
            _KnitIndex.HEADER,
 
262
            "a option 0 1 :",
 
263
            "b option 0 1 0 .e :",
 
264
            "c option 0 1 1 0 :",
 
265
            "d option 0 1 2 .f :"
 
266
            ])
 
267
        index = _KnitIndex(transport, "filename", "r")
 
268
 
 
269
        self.assertEqual([], index.get_ancestry([]))
 
270
        self.assertEqual([u"a"], index.get_ancestry([u"a"]))
 
271
        self.assertEqual([u"a", u"b"], index.get_ancestry([u"b"]))
 
272
        self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"c"]))
 
273
        self.assertEqual([u"a", u"b", u"c", u"d"], index.get_ancestry([u"d"]))
 
274
        self.assertEqual([u"a", u"b"], index.get_ancestry([u"a", u"b"]))
 
275
        self.assertEqual([u"a", u"b", u"c"], index.get_ancestry([u"a", u"c"]))
 
276
 
 
277
        self.assertRaises(RevisionNotPresent, index.get_ancestry, [u"e"])
 
278
 
 
279
    def test_get_ancestry_with_ghosts(self):
 
280
        transport = MockTransport([
 
281
            _KnitIndex.HEADER,
 
282
            "a option 0 1 :",
 
283
            "b option 0 1 0 .e :",
 
284
            "c option 0 1 0 .f .g :",
 
285
            "d option 0 1 2 .h .j .k :"
 
286
            ])
 
287
        index = _KnitIndex(transport, "filename", "r")
 
288
 
 
289
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
290
        self.assertEqual([u"a"], index.get_ancestry_with_ghosts([u"a"]))
 
291
        self.assertEqual([u"a", u"e", u"b"],
 
292
            index.get_ancestry_with_ghosts([u"b"]))
 
293
        self.assertEqual([u"a", u"g", u"f", u"c"],
 
294
            index.get_ancestry_with_ghosts([u"c"]))
 
295
        self.assertEqual([u"a", u"g", u"f", u"c", u"k", u"j", u"h", u"d"],
 
296
            index.get_ancestry_with_ghosts([u"d"]))
 
297
        self.assertEqual([u"a", u"e", u"b"],
 
298
            index.get_ancestry_with_ghosts([u"a", u"b"]))
 
299
        self.assertEqual([u"a", u"g", u"f", u"c"],
 
300
            index.get_ancestry_with_ghosts([u"a", u"c"]))
 
301
        self.assertEqual(
 
302
            [u"a", u"g", u"f", u"c", u"e", u"b", u"k", u"j", u"h", u"d"],
 
303
            index.get_ancestry_with_ghosts([u"b", u"d"]))
 
304
 
 
305
        self.assertRaises(RevisionNotPresent,
 
306
            index.get_ancestry_with_ghosts, [u"e"])
 
307
 
 
308
    def test_num_versions(self):
 
309
        transport = MockTransport([
 
310
            _KnitIndex.HEADER
 
311
            ])
 
312
        index = _KnitIndex(transport, "filename", "r")
 
313
 
 
314
        self.assertEqual(0, index.num_versions())
 
315
        self.assertEqual(0, len(index))
 
316
 
 
317
        index.add_version(u"a", ["option"], 0, 1, [])
 
318
        self.assertEqual(1, index.num_versions())
 
319
        self.assertEqual(1, len(index))
 
320
 
 
321
        index.add_version(u"a", ["option2"], 1, 2, [])
 
322
        self.assertEqual(1, index.num_versions())
 
323
        self.assertEqual(1, len(index))
 
324
 
 
325
        index.add_version(u"b", ["option"], 0, 1, [])
 
326
        self.assertEqual(2, index.num_versions())
 
327
        self.assertEqual(2, len(index))
 
328
 
 
329
    def test_get_versions(self):
 
330
        transport = MockTransport([
 
331
            _KnitIndex.HEADER
 
332
            ])
 
333
        index = _KnitIndex(transport, "filename", "r")
 
334
 
 
335
        self.assertEqual([], index.get_versions())
 
336
 
 
337
        index.add_version(u"a", ["option"], 0, 1, [])
 
338
        self.assertEqual([u"a"], index.get_versions())
 
339
 
 
340
        index.add_version(u"a", ["option"], 0, 1, [])
 
341
        self.assertEqual([u"a"], index.get_versions())
 
342
 
 
343
        index.add_version(u"b", ["option"], 0, 1, [])
 
344
        self.assertEqual([u"a", u"b"], index.get_versions())
 
345
 
 
346
    def test_idx_to_name(self):
 
347
        transport = MockTransport([
 
348
            _KnitIndex.HEADER,
 
349
            "a option 0 1 :",
 
350
            "b option 0 1 :"
 
351
            ])
 
352
        index = _KnitIndex(transport, "filename", "r")
 
353
 
 
354
        self.assertEqual(u"a", index.idx_to_name(0))
 
355
        self.assertEqual(u"b", index.idx_to_name(1))
 
356
        self.assertEqual(u"b", index.idx_to_name(-1))
 
357
        self.assertEqual(u"a", index.idx_to_name(-2))
 
358
 
 
359
    def test_lookup(self):
 
360
        transport = MockTransport([
 
361
            _KnitIndex.HEADER,
 
362
            "a option 0 1 :",
 
363
            "b option 0 1 :"
 
364
            ])
 
365
        index = _KnitIndex(transport, "filename", "r")
 
366
 
 
367
        self.assertEqual(0, index.lookup(u"a"))
 
368
        self.assertEqual(1, index.lookup(u"b"))
 
369
 
 
370
    def test_add_version(self):
 
371
        transport = MockTransport([
 
372
            _KnitIndex.HEADER
 
373
            ])
 
374
        index = _KnitIndex(transport, "filename", "r")
 
375
 
 
376
        index.add_version(u"a", ["option"], 0, 1, [u"b"])
 
377
        self.assertEqual(("append_bytes",
 
378
            ("filename", "\na option 0 1 .b :"),
 
379
            {}), transport.calls.pop(0))
 
380
        self.assertTrue(index.has_version(u"a"))
 
381
        self.assertEqual(1, index.num_versions())
 
382
        self.assertEqual((0, 1), index.get_position(u"a"))
 
383
        self.assertEqual(["option"], index.get_options(u"a"))
 
384
        self.assertEqual([u"b"], index.get_parents_with_ghosts(u"a"))
 
385
 
 
386
        index.add_version(u"a", ["opt"], 1, 2, [u"c"])
 
387
        self.assertEqual(("append_bytes",
 
388
            ("filename", "\na opt 1 2 .c :"),
 
389
            {}), transport.calls.pop(0))
 
390
        self.assertTrue(index.has_version(u"a"))
 
391
        self.assertEqual(1, index.num_versions())
 
392
        self.assertEqual((1, 2), index.get_position(u"a"))
 
393
        self.assertEqual(["opt"], index.get_options(u"a"))
 
394
        self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
 
395
 
 
396
        index.add_version(u"b", ["option"], 2, 3, [u"a"])
 
397
        self.assertEqual(("append_bytes",
 
398
            ("filename", "\nb option 2 3 0 :"),
 
399
            {}), transport.calls.pop(0))
 
400
        self.assertTrue(index.has_version(u"b"))
 
401
        self.assertEqual(2, index.num_versions())
 
402
        self.assertEqual((2, 3), index.get_position(u"b"))
 
403
        self.assertEqual(["option"], index.get_options(u"b"))
 
404
        self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
 
405
 
 
406
    def test_add_versions(self):
 
407
        transport = MockTransport([
 
408
            _KnitIndex.HEADER
 
409
            ])
 
410
        index = _KnitIndex(transport, "filename", "r")
 
411
 
 
412
        index.add_versions([
 
413
            (u"a", ["option"], 0, 1, [u"b"]),
 
414
            (u"a", ["opt"], 1, 2, [u"c"]),
 
415
            (u"b", ["option"], 2, 3, [u"a"])
 
416
            ])
 
417
        self.assertEqual(("append_bytes", ("filename",
 
418
            "\na option 0 1 .b :"
 
419
            "\na opt 1 2 .c :"
 
420
            "\nb option 2 3 0 :"
 
421
            ), {}), transport.calls.pop(0))
 
422
        self.assertTrue(index.has_version(u"a"))
 
423
        self.assertTrue(index.has_version(u"b"))
 
424
        self.assertEqual(2, index.num_versions())
 
425
        self.assertEqual((1, 2), index.get_position(u"a"))
 
426
        self.assertEqual((2, 3), index.get_position(u"b"))
 
427
        self.assertEqual(["opt"], index.get_options(u"a"))
 
428
        self.assertEqual(["option"], index.get_options(u"b"))
 
429
        self.assertEqual([u"c"], index.get_parents_with_ghosts(u"a"))
 
430
        self.assertEqual([u"a"], index.get_parents_with_ghosts(u"b"))
 
431
 
 
432
    def test_delay_create_and_add_versions(self):
 
433
        transport = MockTransport()
 
434
 
 
435
        index = _KnitIndex(transport, "filename", "w",
 
436
            create=True, file_mode="wb", create_parent_dir=True,
 
437
            delay_create=True, dir_mode=0777)
 
438
        self.assertEqual([], transport.calls)
 
439
 
 
440
        index.add_versions([
 
441
            (u"a", ["option"], 0, 1, [u"b"]),
 
442
            (u"a", ["opt"], 1, 2, [u"c"]),
 
443
            (u"b", ["option"], 2, 3, [u"a"])
 
444
            ])
 
445
        name, (filename, f), kwargs = transport.calls.pop(0)
 
446
        self.assertEqual("put_file_non_atomic", name)
 
447
        self.assertEqual(
 
448
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
449
            kwargs)
 
450
        self.assertEqual("filename", filename)
 
451
        self.assertEqual(
 
452
            index.HEADER +
 
453
            "\na option 0 1 .b :"
 
454
            "\na opt 1 2 .c :"
 
455
            "\nb option 2 3 0 :",
 
456
            f.read())
 
457
 
 
458
    def test_has_version(self):
 
459
        transport = MockTransport([
 
460
            _KnitIndex.HEADER,
 
461
            "a option 0 1 :"
 
462
            ])
 
463
        index = _KnitIndex(transport, "filename", "r")
 
464
 
 
465
        self.assertTrue(index.has_version(u"a"))
 
466
        self.assertFalse(index.has_version(u"b"))
 
467
 
 
468
    def test_get_position(self):
 
469
        transport = MockTransport([
 
470
            _KnitIndex.HEADER,
 
471
            "a option 0 1 :",
 
472
            "b option 1 2 :"
 
473
            ])
 
474
        index = _KnitIndex(transport, "filename", "r")
 
475
 
 
476
        self.assertEqual((0, 1), index.get_position(u"a"))
 
477
        self.assertEqual((1, 2), index.get_position(u"b"))
 
478
 
 
479
    def test_get_method(self):
 
480
        transport = MockTransport([
 
481
            _KnitIndex.HEADER,
 
482
            "a fulltext,unknown 0 1 :",
 
483
            "b unknown,line-delta 1 2 :",
 
484
            "c bad 3 4 :"
 
485
            ])
 
486
        index = _KnitIndex(transport, "filename", "r")
 
487
 
 
488
        self.assertEqual("fulltext", index.get_method(u"a"))
 
489
        self.assertEqual("line-delta", index.get_method(u"b"))
 
490
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, u"c")
 
491
 
 
492
    def test_get_options(self):
 
493
        transport = MockTransport([
 
494
            _KnitIndex.HEADER,
 
495
            "a opt1 0 1 :",
 
496
            "b opt2,opt3 1 2 :"
 
497
            ])
 
498
        index = _KnitIndex(transport, "filename", "r")
 
499
 
 
500
        self.assertEqual(["opt1"], index.get_options(u"a"))
 
501
        self.assertEqual(["opt2", "opt3"], index.get_options(u"b"))
 
502
 
 
503
    def test_get_parents(self):
 
504
        transport = MockTransport([
 
505
            _KnitIndex.HEADER,
 
506
            "a option 0 1 :",
 
507
            "b option 1 2 0 .c :",
 
508
            "c option 1 2 1 0 .e :"
 
509
            ])
 
510
        index = _KnitIndex(transport, "filename", "r")
 
511
 
 
512
        self.assertEqual([], index.get_parents(u"a"))
 
513
        self.assertEqual([u"a", u"c"], index.get_parents(u"b"))
 
514
        self.assertEqual([u"b", u"a"], index.get_parents(u"c"))
 
515
 
 
516
    def test_get_parents_with_ghosts(self):
 
517
        transport = MockTransport([
 
518
            _KnitIndex.HEADER,
 
519
            "a option 0 1 :",
 
520
            "b option 1 2 0 .c :",
 
521
            "c option 1 2 1 0 .e :"
 
522
            ])
 
523
        index = _KnitIndex(transport, "filename", "r")
 
524
 
 
525
        self.assertEqual([], index.get_parents_with_ghosts(u"a"))
 
526
        self.assertEqual([u"a", u"c"], index.get_parents_with_ghosts(u"b"))
 
527
        self.assertEqual([u"b", u"a", u"e"],
 
528
            index.get_parents_with_ghosts(u"c"))
 
529
 
 
530
    def test_check_versions_present(self):
 
531
        transport = MockTransport([
 
532
            _KnitIndex.HEADER,
 
533
            "a option 0 1 :",
 
534
            "b option 0 1 :"
 
535
            ])
 
536
        index = _KnitIndex(transport, "filename", "r")
 
537
 
 
538
        check = index.check_versions_present
 
539
 
 
540
        check([])
 
541
        check([u"a"])
 
542
        check([u"b"])
 
543
        check([u"a", u"b"])
 
544
        self.assertRaises(RevisionNotPresent, check, [u"c"])
 
545
        self.assertRaises(RevisionNotPresent, check, [u"a", u"b", u"c"])
 
546
 
 
547
 
90
548
class KnitTests(TestCaseWithTransport):
91
549
    """Class containing knit test helper routines."""
92
550
 
763
1221
        t = get_transport('.')
764
1222
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
765
1223
 
766
 
        self.assertRaises(errors.KnitHeaderError, self.make_test_knit)
 
1224
        self.assertRaises(KnitHeaderError, self.make_test_knit)