/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Wouter van Heyst
  • Date: 2007-07-21 15:32:37 UTC
  • mto: This revision was merged to the branch mainline in revision 2645.
  • Revision ID: larstiq@larstiq.dyndns.org-20070721153237-rh7v5kz6rh3r2mza
As Aaron explained #127115 is more general, failing whenever other's basis is an ancestor of this' basis.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
from cStringIO import StringIO
 
20
import difflib
 
21
import gzip
 
22
import sha
 
23
import sys
 
24
 
 
25
from bzrlib import (
 
26
    errors,
 
27
    generate_ids,
 
28
    knit,
 
29
    )
 
30
from bzrlib.errors import (
 
31
    RevisionAlreadyPresent,
 
32
    KnitHeaderError,
 
33
    RevisionNotPresent,
 
34
    NoSuchFile,
 
35
    )
 
36
from bzrlib.knit import (
 
37
    KnitContent,
 
38
    KnitVersionedFile,
 
39
    KnitPlainFactory,
 
40
    KnitAnnotateFactory,
 
41
    _KnitData,
 
42
    _KnitIndex,
 
43
    WeaveToKnit,
 
44
    KnitSequenceMatcher,
 
45
    )
 
46
from bzrlib.osutils import split_lines
 
47
from bzrlib.tests import TestCase, TestCaseWithTransport, Feature
 
48
from bzrlib.transport import TransportLogger, get_transport
 
49
from bzrlib.transport.memory import MemoryTransport
 
50
from bzrlib.weave import Weave
 
51
 
 
52
 
 
53
class _CompiledKnitFeature(Feature):
 
54
 
 
55
    def _probe(self):
 
56
        try:
 
57
            import bzrlib._knit_load_data_c
 
58
        except ImportError:
 
59
            return False
 
60
        return True
 
61
 
 
62
    def feature_name(self):
 
63
        return 'bzrlib._knit_load_data_c'
 
64
 
 
65
CompiledKnitFeature = _CompiledKnitFeature()
 
66
 
 
67
 
 
68
class KnitContentTests(TestCase):
 
69
 
 
70
    def test_constructor(self):
 
71
        content = KnitContent([])
 
72
 
 
73
    def test_text(self):
 
74
        content = KnitContent([])
 
75
        self.assertEqual(content.text(), [])
 
76
 
 
77
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
78
        self.assertEqual(content.text(), ["text1", "text2"])
 
79
 
 
80
    def test_annotate(self):
 
81
        content = KnitContent([])
 
82
        self.assertEqual(content.annotate(), [])
 
83
 
 
84
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
85
        self.assertEqual(content.annotate(),
 
86
            [("origin1", "text1"), ("origin2", "text2")])
 
87
 
 
88
    def test_annotate_iter(self):
 
89
        content = KnitContent([])
 
90
        it = content.annotate_iter()
 
91
        self.assertRaises(StopIteration, it.next)
 
92
 
 
93
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
94
        it = content.annotate_iter()
 
95
        self.assertEqual(it.next(), ("origin1", "text1"))
 
96
        self.assertEqual(it.next(), ("origin2", "text2"))
 
97
        self.assertRaises(StopIteration, it.next)
 
98
 
 
99
    def test_copy(self):
 
100
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
101
        copy = content.copy()
 
102
        self.assertIsInstance(copy, KnitContent)
 
103
        self.assertEqual(copy.annotate(),
 
104
            [("origin1", "text1"), ("origin2", "text2")])
 
105
 
 
106
    def test_line_delta(self):
 
107
        content1 = KnitContent([("", "a"), ("", "b")])
 
108
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
109
        self.assertEqual(content1.line_delta(content2),
 
110
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
111
 
 
112
    def test_line_delta_iter(self):
 
113
        content1 = KnitContent([("", "a"), ("", "b")])
 
114
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
115
        it = content1.line_delta_iter(content2)
 
116
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
117
        self.assertRaises(StopIteration, it.next)
 
118
 
 
119
 
 
120
class MockTransport(object):
 
121
 
 
122
    def __init__(self, file_lines=None):
 
123
        self.file_lines = file_lines
 
124
        self.calls = []
 
125
        # We have no base directory for the MockTransport
 
126
        self.base = ''
 
127
 
 
128
    def get(self, filename):
 
129
        if self.file_lines is None:
 
130
            raise NoSuchFile(filename)
 
131
        else:
 
132
            return StringIO("\n".join(self.file_lines))
 
133
 
 
134
    def readv(self, relpath, offsets):
 
135
        fp = self.get(relpath)
 
136
        for offset, size in offsets:
 
137
            fp.seek(offset)
 
138
            yield offset, fp.read(size)
 
139
 
 
140
    def __getattr__(self, name):
 
141
        def queue_call(*args, **kwargs):
 
142
            self.calls.append((name, args, kwargs))
 
143
        return queue_call
 
144
 
 
145
 
 
146
class LowLevelKnitDataTests(TestCase):
 
147
 
 
148
    def create_gz_content(self, text):
 
149
        sio = StringIO()
 
150
        gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
 
151
        gz_file.write(text)
 
152
        gz_file.close()
 
153
        return sio.getvalue()
 
154
 
 
155
    def test_valid_knit_data(self):
 
156
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
157
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
158
                                        'foo\n'
 
159
                                        'bar\n'
 
160
                                        'end rev-id-1\n'
 
161
                                        % (sha1sum,))
 
162
        transport = MockTransport([gz_txt])
 
163
        data = _KnitData(transport, 'filename', mode='r')
 
164
        records = [('rev-id-1', 0, len(gz_txt))]
 
165
 
 
166
        contents = data.read_records(records)
 
167
        self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
 
168
 
 
169
        raw_contents = list(data.read_records_iter_raw(records))
 
170
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
171
 
 
172
    def test_not_enough_lines(self):
 
173
        sha1sum = sha.new('foo\n').hexdigest()
 
174
        # record says 2 lines data says 1
 
175
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
176
                                        'foo\n'
 
177
                                        'end rev-id-1\n'
 
178
                                        % (sha1sum,))
 
179
        transport = MockTransport([gz_txt])
 
180
        data = _KnitData(transport, 'filename', mode='r')
 
181
        records = [('rev-id-1', 0, len(gz_txt))]
 
182
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
183
 
 
184
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
185
        raw_contents = list(data.read_records_iter_raw(records))
 
186
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
187
 
 
188
    def test_too_many_lines(self):
 
189
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
190
        # record says 1 lines data says 2
 
191
        gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
 
192
                                        'foo\n'
 
193
                                        'bar\n'
 
194
                                        'end rev-id-1\n'
 
195
                                        % (sha1sum,))
 
196
        transport = MockTransport([gz_txt])
 
197
        data = _KnitData(transport, 'filename', mode='r')
 
198
        records = [('rev-id-1', 0, len(gz_txt))]
 
199
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
200
 
 
201
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
202
        raw_contents = list(data.read_records_iter_raw(records))
 
203
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
204
 
 
205
    def test_mismatched_version_id(self):
 
206
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
207
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
208
                                        'foo\n'
 
209
                                        'bar\n'
 
210
                                        'end rev-id-1\n'
 
211
                                        % (sha1sum,))
 
212
        transport = MockTransport([gz_txt])
 
213
        data = _KnitData(transport, 'filename', mode='r')
 
214
        # We are asking for rev-id-2, but the data is rev-id-1
 
215
        records = [('rev-id-2', 0, len(gz_txt))]
 
216
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
217
 
 
218
        # read_records_iter_raw will notice if we request the wrong version.
 
219
        self.assertRaises(errors.KnitCorrupt, list,
 
220
                          data.read_records_iter_raw(records))
 
221
 
 
222
    def test_uncompressed_data(self):
 
223
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
224
        txt = ('version rev-id-1 2 %s\n'
 
225
               'foo\n'
 
226
               'bar\n'
 
227
               'end rev-id-1\n'
 
228
               % (sha1sum,))
 
229
        transport = MockTransport([txt])
 
230
        data = _KnitData(transport, 'filename', mode='r')
 
231
        records = [('rev-id-1', 0, len(txt))]
 
232
 
 
233
        # We don't have valid gzip data ==> corrupt
 
234
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
235
 
 
236
        # read_records_iter_raw will notice the bad data
 
237
        self.assertRaises(errors.KnitCorrupt, list,
 
238
                          data.read_records_iter_raw(records))
 
239
 
 
240
    def test_corrupted_data(self):
 
241
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
242
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
243
                                        'foo\n'
 
244
                                        'bar\n'
 
245
                                        'end rev-id-1\n'
 
246
                                        % (sha1sum,))
 
247
        # Change 2 bytes in the middle to \xff
 
248
        gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
 
249
        transport = MockTransport([gz_txt])
 
250
        data = _KnitData(transport, 'filename', mode='r')
 
251
        records = [('rev-id-1', 0, len(gz_txt))]
 
252
 
 
253
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
254
 
 
255
        # read_records_iter_raw will notice if we request the wrong version.
 
256
        self.assertRaises(errors.KnitCorrupt, list,
 
257
                          data.read_records_iter_raw(records))
 
258
 
 
259
 
 
260
class LowLevelKnitIndexTests(TestCase):
 
261
 
 
262
    def get_knit_index(self, *args, **kwargs):
 
263
        orig = knit._load_data
 
264
        def reset():
 
265
            knit._load_data = orig
 
266
        self.addCleanup(reset)
 
267
        from bzrlib._knit_load_data_py import _load_data_py
 
268
        knit._load_data = _load_data_py
 
269
        return _KnitIndex(*args, **kwargs)
 
270
 
 
271
    def test_no_such_file(self):
 
272
        transport = MockTransport()
 
273
 
 
274
        self.assertRaises(NoSuchFile, self.get_knit_index,
 
275
                          transport, "filename", "r")
 
276
        self.assertRaises(NoSuchFile, self.get_knit_index,
 
277
                          transport, "filename", "w", create=False)
 
278
 
 
279
    def test_create_file(self):
 
280
        transport = MockTransport()
 
281
 
 
282
        index = self.get_knit_index(transport, "filename", "w",
 
283
            file_mode="wb", create=True)
 
284
        self.assertEqual(
 
285
                ("put_bytes_non_atomic",
 
286
                    ("filename", index.HEADER), {"mode": "wb"}),
 
287
                transport.calls.pop(0))
 
288
 
 
289
    def test_delay_create_file(self):
 
290
        transport = MockTransport()
 
291
 
 
292
        index = self.get_knit_index(transport, "filename", "w",
 
293
            create=True, file_mode="wb", create_parent_dir=True,
 
294
            delay_create=True, dir_mode=0777)
 
295
        self.assertEqual([], transport.calls)
 
296
 
 
297
        index.add_versions([])
 
298
        name, (filename, f), kwargs = transport.calls.pop(0)
 
299
        self.assertEqual("put_file_non_atomic", name)
 
300
        self.assertEqual(
 
301
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
302
            kwargs)
 
303
        self.assertEqual("filename", filename)
 
304
        self.assertEqual(index.HEADER, f.read())
 
305
 
 
306
        index.add_versions([])
 
307
        self.assertEqual(("append_bytes", ("filename", ""), {}),
 
308
            transport.calls.pop(0))
 
309
 
 
310
    def test_read_utf8_version_id(self):
 
311
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
312
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
313
        transport = MockTransport([
 
314
            _KnitIndex.HEADER,
 
315
            '%s option 0 1 :' % (utf8_revision_id,)
 
316
            ])
 
317
        index = self.get_knit_index(transport, "filename", "r")
 
318
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
 
319
        # Unicode revision_ids.
 
320
        self.assertTrue(index.has_version(utf8_revision_id))
 
321
        self.assertFalse(index.has_version(unicode_revision_id))
 
322
 
 
323
    def test_read_utf8_parents(self):
 
324
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
325
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
326
        transport = MockTransport([
 
327
            _KnitIndex.HEADER,
 
328
            "version option 0 1 .%s :" % (utf8_revision_id,)
 
329
            ])
 
330
        index = self.get_knit_index(transport, "filename", "r")
 
331
        self.assertEqual([utf8_revision_id],
 
332
            index.get_parents_with_ghosts("version"))
 
333
 
 
334
    def test_read_ignore_corrupted_lines(self):
 
335
        transport = MockTransport([
 
336
            _KnitIndex.HEADER,
 
337
            "corrupted",
 
338
            "corrupted options 0 1 .b .c ",
 
339
            "version options 0 1 :"
 
340
            ])
 
341
        index = self.get_knit_index(transport, "filename", "r")
 
342
        self.assertEqual(1, index.num_versions())
 
343
        self.assertTrue(index.has_version("version"))
 
344
 
 
345
    def test_read_corrupted_header(self):
 
346
        transport = MockTransport(['not a bzr knit index header\n'])
 
347
        self.assertRaises(KnitHeaderError,
 
348
            self.get_knit_index, transport, "filename", "r")
 
349
 
 
350
    def test_read_duplicate_entries(self):
 
351
        transport = MockTransport([
 
352
            _KnitIndex.HEADER,
 
353
            "parent options 0 1 :",
 
354
            "version options1 0 1 0 :",
 
355
            "version options2 1 2 .other :",
 
356
            "version options3 3 4 0 .other :"
 
357
            ])
 
358
        index = self.get_knit_index(transport, "filename", "r")
 
359
        self.assertEqual(2, index.num_versions())
 
360
        self.assertEqual(1, index.lookup("version"))
 
361
        self.assertEqual((3, 4), index.get_position("version"))
 
362
        self.assertEqual(["options3"], index.get_options("version"))
 
363
        self.assertEqual(["parent", "other"],
 
364
            index.get_parents_with_ghosts("version"))
 
365
 
 
366
    def test_read_compressed_parents(self):
 
367
        transport = MockTransport([
 
368
            _KnitIndex.HEADER,
 
369
            "a option 0 1 :",
 
370
            "b option 0 1 0 :",
 
371
            "c option 0 1 1 0 :",
 
372
            ])
 
373
        index = self.get_knit_index(transport, "filename", "r")
 
374
        self.assertEqual(["a"], index.get_parents("b"))
 
375
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
376
 
 
377
    def test_write_utf8_version_id(self):
 
378
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
379
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
380
        transport = MockTransport([
 
381
            _KnitIndex.HEADER
 
382
            ])
 
383
        index = self.get_knit_index(transport, "filename", "r")
 
384
        index.add_version(utf8_revision_id, ["option"], 0, 1, [])
 
385
        self.assertEqual(("append_bytes", ("filename",
 
386
            "\n%s option 0 1  :" % (utf8_revision_id,)),
 
387
            {}),
 
388
            transport.calls.pop(0))
 
389
 
 
390
    def test_write_utf8_parents(self):
 
391
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
392
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
393
        transport = MockTransport([
 
394
            _KnitIndex.HEADER
 
395
            ])
 
396
        index = self.get_knit_index(transport, "filename", "r")
 
397
        index.add_version("version", ["option"], 0, 1, [utf8_revision_id])
 
398
        self.assertEqual(("append_bytes", ("filename",
 
399
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
 
400
            {}),
 
401
            transport.calls.pop(0))
 
402
 
 
403
    def test_get_graph(self):
 
404
        transport = MockTransport()
 
405
        index = self.get_knit_index(transport, "filename", "w", create=True)
 
406
        self.assertEqual([], index.get_graph())
 
407
 
 
408
        index.add_version("a", ["option"], 0, 1, ["b"])
 
409
        self.assertEqual([("a", ["b"])], index.get_graph())
 
410
 
 
411
        index.add_version("c", ["option"], 0, 1, ["d"])
 
412
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
 
413
            sorted(index.get_graph()))
 
414
 
 
415
    def test_get_ancestry(self):
 
416
        transport = MockTransport([
 
417
            _KnitIndex.HEADER,
 
418
            "a option 0 1 :",
 
419
            "b option 0 1 0 .e :",
 
420
            "c option 0 1 1 0 :",
 
421
            "d option 0 1 2 .f :"
 
422
            ])
 
423
        index = self.get_knit_index(transport, "filename", "r")
 
424
 
 
425
        self.assertEqual([], index.get_ancestry([]))
 
426
        self.assertEqual(["a"], index.get_ancestry(["a"]))
 
427
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
 
428
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
 
429
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
 
430
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
 
431
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
 
432
 
 
433
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
 
434
 
 
435
    def test_get_ancestry_with_ghosts(self):
 
436
        transport = MockTransport([
 
437
            _KnitIndex.HEADER,
 
438
            "a option 0 1 :",
 
439
            "b option 0 1 0 .e :",
 
440
            "c option 0 1 0 .f .g :",
 
441
            "d option 0 1 2 .h .j .k :"
 
442
            ])
 
443
        index = self.get_knit_index(transport, "filename", "r")
 
444
 
 
445
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
446
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
 
447
        self.assertEqual(["a", "e", "b"],
 
448
            index.get_ancestry_with_ghosts(["b"]))
 
449
        self.assertEqual(["a", "g", "f", "c"],
 
450
            index.get_ancestry_with_ghosts(["c"]))
 
451
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
 
452
            index.get_ancestry_with_ghosts(["d"]))
 
453
        self.assertEqual(["a", "e", "b"],
 
454
            index.get_ancestry_with_ghosts(["a", "b"]))
 
455
        self.assertEqual(["a", "g", "f", "c"],
 
456
            index.get_ancestry_with_ghosts(["a", "c"]))
 
457
        self.assertEqual(
 
458
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
 
459
            index.get_ancestry_with_ghosts(["b", "d"]))
 
460
 
 
461
        self.assertRaises(RevisionNotPresent,
 
462
            index.get_ancestry_with_ghosts, ["e"])
 
463
 
 
464
    def test_num_versions(self):
 
465
        transport = MockTransport([
 
466
            _KnitIndex.HEADER
 
467
            ])
 
468
        index = self.get_knit_index(transport, "filename", "r")
 
469
 
 
470
        self.assertEqual(0, index.num_versions())
 
471
        self.assertEqual(0, len(index))
 
472
 
 
473
        index.add_version("a", ["option"], 0, 1, [])
 
474
        self.assertEqual(1, index.num_versions())
 
475
        self.assertEqual(1, len(index))
 
476
 
 
477
        index.add_version("a", ["option2"], 1, 2, [])
 
478
        self.assertEqual(1, index.num_versions())
 
479
        self.assertEqual(1, len(index))
 
480
 
 
481
        index.add_version("b", ["option"], 0, 1, [])
 
482
        self.assertEqual(2, index.num_versions())
 
483
        self.assertEqual(2, len(index))
 
484
 
 
485
    def test_get_versions(self):
 
486
        transport = MockTransport([
 
487
            _KnitIndex.HEADER
 
488
            ])
 
489
        index = self.get_knit_index(transport, "filename", "r")
 
490
 
 
491
        self.assertEqual([], index.get_versions())
 
492
 
 
493
        index.add_version("a", ["option"], 0, 1, [])
 
494
        self.assertEqual(["a"], index.get_versions())
 
495
 
 
496
        index.add_version("a", ["option"], 0, 1, [])
 
497
        self.assertEqual(["a"], index.get_versions())
 
498
 
 
499
        index.add_version("b", ["option"], 0, 1, [])
 
500
        self.assertEqual(["a", "b"], index.get_versions())
 
501
 
 
502
    def test_idx_to_name(self):
 
503
        transport = MockTransport([
 
504
            _KnitIndex.HEADER,
 
505
            "a option 0 1 :",
 
506
            "b option 0 1 :"
 
507
            ])
 
508
        index = self.get_knit_index(transport, "filename", "r")
 
509
 
 
510
        self.assertEqual("a", index.idx_to_name(0))
 
511
        self.assertEqual("b", index.idx_to_name(1))
 
512
        self.assertEqual("b", index.idx_to_name(-1))
 
513
        self.assertEqual("a", index.idx_to_name(-2))
 
514
 
 
515
    def test_lookup(self):
 
516
        transport = MockTransport([
 
517
            _KnitIndex.HEADER,
 
518
            "a option 0 1 :",
 
519
            "b option 0 1 :"
 
520
            ])
 
521
        index = self.get_knit_index(transport, "filename", "r")
 
522
 
 
523
        self.assertEqual(0, index.lookup("a"))
 
524
        self.assertEqual(1, index.lookup("b"))
 
525
 
 
526
    def test_add_version(self):
 
527
        transport = MockTransport([
 
528
            _KnitIndex.HEADER
 
529
            ])
 
530
        index = self.get_knit_index(transport, "filename", "r")
 
531
 
 
532
        index.add_version("a", ["option"], 0, 1, ["b"])
 
533
        self.assertEqual(("append_bytes",
 
534
            ("filename", "\na option 0 1 .b :"),
 
535
            {}), transport.calls.pop(0))
 
536
        self.assertTrue(index.has_version("a"))
 
537
        self.assertEqual(1, index.num_versions())
 
538
        self.assertEqual((0, 1), index.get_position("a"))
 
539
        self.assertEqual(["option"], index.get_options("a"))
 
540
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
 
541
 
 
542
        index.add_version("a", ["opt"], 1, 2, ["c"])
 
543
        self.assertEqual(("append_bytes",
 
544
            ("filename", "\na opt 1 2 .c :"),
 
545
            {}), transport.calls.pop(0))
 
546
        self.assertTrue(index.has_version("a"))
 
547
        self.assertEqual(1, index.num_versions())
 
548
        self.assertEqual((1, 2), index.get_position("a"))
 
549
        self.assertEqual(["opt"], index.get_options("a"))
 
550
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
551
 
 
552
        index.add_version("b", ["option"], 2, 3, ["a"])
 
553
        self.assertEqual(("append_bytes",
 
554
            ("filename", "\nb option 2 3 0 :"),
 
555
            {}), transport.calls.pop(0))
 
556
        self.assertTrue(index.has_version("b"))
 
557
        self.assertEqual(2, index.num_versions())
 
558
        self.assertEqual((2, 3), index.get_position("b"))
 
559
        self.assertEqual(["option"], index.get_options("b"))
 
560
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
561
 
 
562
    def test_add_versions(self):
 
563
        transport = MockTransport([
 
564
            _KnitIndex.HEADER
 
565
            ])
 
566
        index = self.get_knit_index(transport, "filename", "r")
 
567
 
 
568
        index.add_versions([
 
569
            ("a", ["option"], 0, 1, ["b"]),
 
570
            ("a", ["opt"], 1, 2, ["c"]),
 
571
            ("b", ["option"], 2, 3, ["a"])
 
572
            ])
 
573
        self.assertEqual(("append_bytes", ("filename",
 
574
            "\na option 0 1 .b :"
 
575
            "\na opt 1 2 .c :"
 
576
            "\nb option 2 3 0 :"
 
577
            ), {}), transport.calls.pop(0))
 
578
        self.assertTrue(index.has_version("a"))
 
579
        self.assertTrue(index.has_version("b"))
 
580
        self.assertEqual(2, index.num_versions())
 
581
        self.assertEqual((1, 2), index.get_position("a"))
 
582
        self.assertEqual((2, 3), index.get_position("b"))
 
583
        self.assertEqual(["opt"], index.get_options("a"))
 
584
        self.assertEqual(["option"], index.get_options("b"))
 
585
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
586
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
587
 
 
588
    def test_delay_create_and_add_versions(self):
 
589
        transport = MockTransport()
 
590
 
 
591
        index = self.get_knit_index(transport, "filename", "w",
 
592
            create=True, file_mode="wb", create_parent_dir=True,
 
593
            delay_create=True, dir_mode=0777)
 
594
        self.assertEqual([], transport.calls)
 
595
 
 
596
        index.add_versions([
 
597
            ("a", ["option"], 0, 1, ["b"]),
 
598
            ("a", ["opt"], 1, 2, ["c"]),
 
599
            ("b", ["option"], 2, 3, ["a"])
 
600
            ])
 
601
        name, (filename, f), kwargs = transport.calls.pop(0)
 
602
        self.assertEqual("put_file_non_atomic", name)
 
603
        self.assertEqual(
 
604
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
605
            kwargs)
 
606
        self.assertEqual("filename", filename)
 
607
        self.assertEqual(
 
608
            index.HEADER +
 
609
            "\na option 0 1 .b :"
 
610
            "\na opt 1 2 .c :"
 
611
            "\nb option 2 3 0 :",
 
612
            f.read())
 
613
 
 
614
    def test_has_version(self):
 
615
        transport = MockTransport([
 
616
            _KnitIndex.HEADER,
 
617
            "a option 0 1 :"
 
618
            ])
 
619
        index = self.get_knit_index(transport, "filename", "r")
 
620
 
 
621
        self.assertTrue(index.has_version("a"))
 
622
        self.assertFalse(index.has_version("b"))
 
623
 
 
624
    def test_get_position(self):
 
625
        transport = MockTransport([
 
626
            _KnitIndex.HEADER,
 
627
            "a option 0 1 :",
 
628
            "b option 1 2 :"
 
629
            ])
 
630
        index = self.get_knit_index(transport, "filename", "r")
 
631
 
 
632
        self.assertEqual((0, 1), index.get_position("a"))
 
633
        self.assertEqual((1, 2), index.get_position("b"))
 
634
 
 
635
    def test_get_method(self):
 
636
        transport = MockTransport([
 
637
            _KnitIndex.HEADER,
 
638
            "a fulltext,unknown 0 1 :",
 
639
            "b unknown,line-delta 1 2 :",
 
640
            "c bad 3 4 :"
 
641
            ])
 
642
        index = self.get_knit_index(transport, "filename", "r")
 
643
 
 
644
        self.assertEqual("fulltext", index.get_method("a"))
 
645
        self.assertEqual("line-delta", index.get_method("b"))
 
646
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
 
647
 
 
648
    def test_get_options(self):
 
649
        transport = MockTransport([
 
650
            _KnitIndex.HEADER,
 
651
            "a opt1 0 1 :",
 
652
            "b opt2,opt3 1 2 :"
 
653
            ])
 
654
        index = self.get_knit_index(transport, "filename", "r")
 
655
 
 
656
        self.assertEqual(["opt1"], index.get_options("a"))
 
657
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
 
658
 
 
659
    def test_get_parents(self):
 
660
        transport = MockTransport([
 
661
            _KnitIndex.HEADER,
 
662
            "a option 0 1 :",
 
663
            "b option 1 2 0 .c :",
 
664
            "c option 1 2 1 0 .e :"
 
665
            ])
 
666
        index = self.get_knit_index(transport, "filename", "r")
 
667
 
 
668
        self.assertEqual([], index.get_parents("a"))
 
669
        self.assertEqual(["a", "c"], index.get_parents("b"))
 
670
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
671
 
 
672
    def test_get_parents_with_ghosts(self):
 
673
        transport = MockTransport([
 
674
            _KnitIndex.HEADER,
 
675
            "a option 0 1 :",
 
676
            "b option 1 2 0 .c :",
 
677
            "c option 1 2 1 0 .e :"
 
678
            ])
 
679
        index = self.get_knit_index(transport, "filename", "r")
 
680
 
 
681
        self.assertEqual([], index.get_parents_with_ghosts("a"))
 
682
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
 
683
        self.assertEqual(["b", "a", "e"],
 
684
            index.get_parents_with_ghosts("c"))
 
685
 
 
686
    def test_check_versions_present(self):
 
687
        transport = MockTransport([
 
688
            _KnitIndex.HEADER,
 
689
            "a option 0 1 :",
 
690
            "b option 0 1 :"
 
691
            ])
 
692
        index = self.get_knit_index(transport, "filename", "r")
 
693
 
 
694
        check = index.check_versions_present
 
695
 
 
696
        check([])
 
697
        check(["a"])
 
698
        check(["b"])
 
699
        check(["a", "b"])
 
700
        self.assertRaises(RevisionNotPresent, check, ["c"])
 
701
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
 
702
 
 
703
    def test_impossible_parent(self):
 
704
        """Test we get KnitCorrupt if the parent couldn't possibly exist."""
 
705
        transport = MockTransport([
 
706
            _KnitIndex.HEADER,
 
707
            "a option 0 1 :",
 
708
            "b option 0 1 4 :"  # We don't have a 4th record
 
709
            ])
 
710
        try:
 
711
            self.assertRaises(errors.KnitCorrupt,
 
712
                              self.get_knit_index, transport, 'filename', 'r')
 
713
        except TypeError, e:
 
714
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
715
                           ' not exceptions.IndexError')
 
716
                and sys.version_info[0:2] >= (2,5)):
 
717
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
718
                                  ' raising new style exceptions with python'
 
719
                                  ' >=2.5')
 
720
            else:
 
721
                raise
 
722
 
 
723
    def test_corrupted_parent(self):
 
724
        transport = MockTransport([
 
725
            _KnitIndex.HEADER,
 
726
            "a option 0 1 :",
 
727
            "b option 0 1 :",
 
728
            "c option 0 1 1v :", # Can't have a parent of '1v'
 
729
            ])
 
730
        try:
 
731
            self.assertRaises(errors.KnitCorrupt,
 
732
                              self.get_knit_index, transport, 'filename', 'r')
 
733
        except TypeError, e:
 
734
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
735
                           ' not exceptions.ValueError')
 
736
                and sys.version_info[0:2] >= (2,5)):
 
737
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
738
                                  ' raising new style exceptions with python'
 
739
                                  ' >=2.5')
 
740
            else:
 
741
                raise
 
742
 
 
743
    def test_corrupted_parent_in_list(self):
 
744
        transport = MockTransport([
 
745
            _KnitIndex.HEADER,
 
746
            "a option 0 1 :",
 
747
            "b option 0 1 :",
 
748
            "c option 0 1 1 v :", # Can't have a parent of 'v'
 
749
            ])
 
750
        try:
 
751
            self.assertRaises(errors.KnitCorrupt,
 
752
                              self.get_knit_index, transport, 'filename', 'r')
 
753
        except TypeError, e:
 
754
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
755
                           ' not exceptions.ValueError')
 
756
                and sys.version_info[0:2] >= (2,5)):
 
757
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
758
                                  ' raising new style exceptions with python'
 
759
                                  ' >=2.5')
 
760
            else:
 
761
                raise
 
762
 
 
763
    def test_invalid_position(self):
 
764
        transport = MockTransport([
 
765
            _KnitIndex.HEADER,
 
766
            "a option 1v 1 :",
 
767
            ])
 
768
        try:
 
769
            self.assertRaises(errors.KnitCorrupt,
 
770
                              self.get_knit_index, transport, 'filename', 'r')
 
771
        except TypeError, e:
 
772
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
773
                           ' not exceptions.ValueError')
 
774
                and sys.version_info[0:2] >= (2,5)):
 
775
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
776
                                  ' raising new style exceptions with python'
 
777
                                  ' >=2.5')
 
778
            else:
 
779
                raise
 
780
 
 
781
    def test_invalid_size(self):
 
782
        transport = MockTransport([
 
783
            _KnitIndex.HEADER,
 
784
            "a option 1 1v :",
 
785
            ])
 
786
        try:
 
787
            self.assertRaises(errors.KnitCorrupt,
 
788
                              self.get_knit_index, transport, 'filename', 'r')
 
789
        except TypeError, e:
 
790
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
791
                           ' not exceptions.ValueError')
 
792
                and sys.version_info[0:2] >= (2,5)):
 
793
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
794
                                  ' raising new style exceptions with python'
 
795
                                  ' >=2.5')
 
796
            else:
 
797
                raise
 
798
 
 
799
    def test_short_line(self):
 
800
        transport = MockTransport([
 
801
            _KnitIndex.HEADER,
 
802
            "a option 0 10  :",
 
803
            "b option 10 10 0", # This line isn't terminated, ignored
 
804
            ])
 
805
        index = self.get_knit_index(transport, "filename", "r")
 
806
        self.assertEqual(['a'], index.get_versions())
 
807
 
 
808
    def test_skip_incomplete_record(self):
 
809
        # A line with bogus data should just be skipped
 
810
        transport = MockTransport([
 
811
            _KnitIndex.HEADER,
 
812
            "a option 0 10  :",
 
813
            "b option 10 10 0", # This line isn't terminated, ignored
 
814
            "c option 20 10 0 :", # Properly terminated, and starts with '\n'
 
815
            ])
 
816
        index = self.get_knit_index(transport, "filename", "r")
 
817
        self.assertEqual(['a', 'c'], index.get_versions())
 
818
 
 
819
    def test_trailing_characters(self):
 
820
        # A line with bogus data should just be skipped
 
821
        transport = MockTransport([
 
822
            _KnitIndex.HEADER,
 
823
            "a option 0 10  :",
 
824
            "b option 10 10 0 :a", # This line has extra trailing characters
 
825
            "c option 20 10 0 :", # Properly terminated, and starts with '\n'
 
826
            ])
 
827
        index = self.get_knit_index(transport, "filename", "r")
 
828
        self.assertEqual(['a', 'c'], index.get_versions())
 
829
 
 
830
 
 
831
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
 
832
 
 
833
    _test_needs_features = [CompiledKnitFeature]
 
834
 
 
835
    def get_knit_index(self, *args, **kwargs):
 
836
        orig = knit._load_data
 
837
        def reset():
 
838
            knit._load_data = orig
 
839
        self.addCleanup(reset)
 
840
        from bzrlib._knit_load_data_c import _load_data_c
 
841
        knit._load_data = _load_data_c
 
842
        return _KnitIndex(*args, **kwargs)
 
843
 
 
844
 
 
845
 
 
846
class KnitTests(TestCaseWithTransport):
 
847
    """Class containing knit test helper routines."""
 
848
 
 
849
    def make_test_knit(self, annotate=False, delay_create=False):
 
850
        if not annotate:
 
851
            factory = KnitPlainFactory()
 
852
        else:
 
853
            factory = None
 
854
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
855
                                 factory=factory, create=True,
 
856
                                 delay_create=delay_create)
 
857
 
 
858
 
 
859
class BasicKnitTests(KnitTests):
 
860
 
 
861
    def add_stock_one_and_one_a(self, k):
 
862
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
863
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
864
 
 
865
    def test_knit_constructor(self):
 
866
        """Construct empty k"""
 
867
        self.make_test_knit()
 
868
 
 
869
    def test_knit_add(self):
 
870
        """Store one text in knit and retrieve"""
 
871
        k = self.make_test_knit()
 
872
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
873
        self.assertTrue(k.has_version('text-1'))
 
874
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
875
 
 
876
    def test_knit_reload(self):
 
877
        # test that the content in a reloaded knit is correct
 
878
        k = self.make_test_knit()
 
879
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
880
        del k
 
881
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
882
        self.assertTrue(k2.has_version('text-1'))
 
883
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
884
 
 
885
    def test_knit_several(self):
 
886
        """Store several texts in a knit"""
 
887
        k = self.make_test_knit()
 
888
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
889
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
890
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
891
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
892
        
 
893
    def test_repeated_add(self):
 
894
        """Knit traps attempt to replace existing version"""
 
895
        k = self.make_test_knit()
 
896
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
897
        self.assertRaises(RevisionAlreadyPresent, 
 
898
                k.add_lines,
 
899
                'text-1', [], split_lines(TEXT_1))
 
900
 
 
901
    def test_empty(self):
 
902
        k = self.make_test_knit(True)
 
903
        k.add_lines('text-1', [], [])
 
904
        self.assertEquals(k.get_lines('text-1'), [])
 
905
 
 
906
    def test_incomplete(self):
 
907
        """Test if texts without a ending line-end can be inserted and
 
908
        extracted."""
 
909
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
910
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
911
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
912
        # reopening ensures maximum room for confusion
 
913
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
914
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
915
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
916
 
 
917
    def test_delta(self):
 
918
        """Expression of knit delta as lines"""
 
919
        k = self.make_test_knit()
 
920
        KnitContent
 
921
        td = list(line_delta(TEXT_1.splitlines(True),
 
922
                             TEXT_1A.splitlines(True)))
 
923
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
924
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
925
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
926
 
 
927
    def assertDerivedBlocksEqual(self, source, target, noeol=False):
 
928
        """Assert that the derived matching blocks match real output"""
 
929
        source_lines = source.splitlines(True)
 
930
        target_lines = target.splitlines(True)
 
931
        def nl(line):
 
932
            if noeol and not line.endswith('\n'):
 
933
                return line + '\n'
 
934
            else:
 
935
                return line
 
936
        source_content = KnitContent([(None, nl(l)) for l in source_lines])
 
937
        target_content = KnitContent([(None, nl(l)) for l in target_lines])
 
938
        line_delta = source_content.line_delta(target_content)
 
939
        delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
 
940
            source_lines, target_lines))
 
941
        matcher = KnitSequenceMatcher(None, source_lines, target_lines)
 
942
        matcher_blocks = list(list(matcher.get_matching_blocks()))
 
943
        self.assertEqual(matcher_blocks, delta_blocks)
 
944
 
 
945
    def test_get_line_delta_blocks(self):
 
946
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
 
947
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
 
948
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
 
949
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
 
950
        self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
 
951
        self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
 
952
        self.assertDerivedBlocksEqual(TEXT_1A, '')
 
953
        self.assertDerivedBlocksEqual('', TEXT_1A)
 
954
        self.assertDerivedBlocksEqual('', '')
 
955
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
 
956
 
 
957
    def test_get_line_delta_blocks_noeol(self):
 
958
        """Handle historical knit deltas safely
 
959
 
 
960
        Some existing knit deltas don't consider the last line to differ
 
961
        when the only difference whether it has a final newline.
 
962
 
 
963
        New knit deltas appear to always consider the last line to differ
 
964
        in this case.
 
965
        """
 
966
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
 
967
        self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
 
968
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
 
969
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
 
970
 
 
971
    def test_add_with_parents(self):
 
972
        """Store in knit with parents"""
 
973
        k = self.make_test_knit()
 
974
        self.add_stock_one_and_one_a(k)
 
975
        self.assertEquals(k.get_parents('text-1'), [])
 
976
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
977
 
 
978
    def test_ancestry(self):
 
979
        """Store in knit with parents"""
 
980
        k = self.make_test_knit()
 
981
        self.add_stock_one_and_one_a(k)
 
982
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
983
 
 
984
    def test_add_delta(self):
 
985
        """Store in knit with parents"""
 
986
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
987
            delta=True, create=True)
 
988
        self.add_stock_one_and_one_a(k)
 
989
        k.clear_cache()
 
990
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
991
 
 
992
    def test_annotate(self):
 
993
        """Annotations"""
 
994
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
995
            delta=True, create=True)
 
996
        self.insert_and_test_small_annotate(k)
 
997
 
 
998
    def insert_and_test_small_annotate(self, k):
 
999
        """test annotation with k works correctly."""
 
1000
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
1001
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
1002
 
 
1003
        origins = k.annotate('text-2')
 
1004
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
1005
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
1006
 
 
1007
    def test_annotate_fulltext(self):
 
1008
        """Annotations"""
 
1009
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
1010
            delta=False, create=True)
 
1011
        self.insert_and_test_small_annotate(k)
 
1012
 
 
1013
    def test_annotate_merge_1(self):
 
1014
        k = self.make_test_knit(True)
 
1015
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
1016
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
1017
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
1018
        origins = k.annotate('text-am')
 
1019
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
1020
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
1021
 
 
1022
    def test_annotate_merge_2(self):
 
1023
        k = self.make_test_knit(True)
 
1024
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1025
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1026
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
1027
        origins = k.annotate('text-am')
 
1028
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1029
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1030
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
1031
 
 
1032
    def test_annotate_merge_9(self):
 
1033
        k = self.make_test_knit(True)
 
1034
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1035
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1036
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
1037
        origins = k.annotate('text-am')
 
1038
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
1039
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1040
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
1041
 
 
1042
    def test_annotate_merge_3(self):
 
1043
        k = self.make_test_knit(True)
 
1044
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1045
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
1046
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
1047
        origins = k.annotate('text-am')
 
1048
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
1049
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1050
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
1051
 
 
1052
    def test_annotate_merge_4(self):
 
1053
        k = self.make_test_knit(True)
 
1054
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1055
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1056
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
1057
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
1058
        origins = k.annotate('text-am')
 
1059
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1060
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
1061
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
1062
 
 
1063
    def test_annotate_merge_5(self):
 
1064
        k = self.make_test_knit(True)
 
1065
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1066
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
1067
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
1068
        k.add_lines('text-am',
 
1069
                    ['text-a1', 'text-a2', 'text-a3'],
 
1070
                    ['a\n', 'e\n', 'z\n'])
 
1071
        origins = k.annotate('text-am')
 
1072
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1073
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
1074
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
1075
 
 
1076
    def test_annotate_file_cherry_pick(self):
 
1077
        k = self.make_test_knit(True)
 
1078
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
1079
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
1080
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
1081
        origins = k.annotate('text-3')
 
1082
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
1083
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
1084
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
1085
 
 
1086
    def test_knit_join(self):
 
1087
        """Store in knit with parents"""
 
1088
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1089
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
1090
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
1091
 
 
1092
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
1093
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
1094
 
 
1095
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
1096
 
 
1097
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1098
        count = k2.join(k1, version_ids=['text-m'])
 
1099
        self.assertEquals(count, 5)
 
1100
        self.assertTrue(k2.has_version('text-a'))
 
1101
        self.assertTrue(k2.has_version('text-c'))
 
1102
 
 
1103
    def test_reannotate(self):
 
1104
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
1105
                               factory=KnitAnnotateFactory(), create=True)
 
1106
        # 0
 
1107
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
1108
        # 1
 
1109
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
1110
 
 
1111
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
1112
                               factory=KnitAnnotateFactory(), create=True)
 
1113
        k2.join(k1, version_ids=['text-b'])
 
1114
 
 
1115
        # 2
 
1116
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
1117
        # 2
 
1118
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
1119
        # 3
 
1120
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
1121
 
 
1122
        # test-c will have index 3
 
1123
        k1.join(k2, version_ids=['text-c'])
 
1124
 
 
1125
        lines = k1.get_lines('text-c')
 
1126
        self.assertEquals(lines, ['z\n', 'c\n'])
 
1127
 
 
1128
        origins = k1.annotate('text-c')
 
1129
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
1130
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
1131
 
 
1132
    def test_get_line_delta_texts(self):
 
1133
        """Make sure we can call get_texts on text with reused line deltas"""
 
1134
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
1135
                               factory=KnitPlainFactory(), create=True)
 
1136
        for t in range(3):
 
1137
            if t == 0:
 
1138
                parents = []
 
1139
            else:
 
1140
                parents = ['%d' % (t-1)]
 
1141
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
1142
        k1.get_texts(('%d' % t) for t in range(3))
 
1143
        
 
1144
    def test_iter_lines_reads_in_order(self):
 
1145
        t = MemoryTransport()
 
1146
        instrumented_t = TransportLogger(t)
 
1147
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
1148
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
1149
        # add texts with no required ordering
 
1150
        k1.add_lines('base', [], ['text\n'])
 
1151
        k1.add_lines('base2', [], ['text2\n'])
 
1152
        k1.clear_cache()
 
1153
        instrumented_t._calls = []
 
1154
        # request a last-first iteration
 
1155
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
1156
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
1157
        self.assertEqual(['text\n', 'text2\n'], results)
 
1158
 
 
1159
    def test_create_empty_annotated(self):
 
1160
        k1 = self.make_test_knit(True)
 
1161
        # 0
 
1162
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
1163
        k2 = k1.create_empty('t', MemoryTransport())
 
1164
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
1165
        self.assertEqual(k1.delta, k2.delta)
 
1166
        # the generic test checks for empty content and file class
 
1167
 
 
1168
    def test_knit_format(self):
 
1169
        # this tests that a new knit index file has the expected content
 
1170
        # and that is writes the data we expect as records are added.
 
1171
        knit = self.make_test_knit(True)
 
1172
        # Now knit files are not created until we first add data to them
 
1173
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
1174
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1175
        self.assertFileEqual(
 
1176
            "# bzr knit index 8\n"
 
1177
            "\n"
 
1178
            "revid fulltext 0 84 .a_ghost :",
 
1179
            'test.kndx')
 
1180
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
1181
        self.assertFileEqual(
 
1182
            "# bzr knit index 8\n"
 
1183
            "\nrevid fulltext 0 84 .a_ghost :"
 
1184
            "\nrevid2 line-delta 84 82 0 :",
 
1185
            'test.kndx')
 
1186
        # we should be able to load this file again
 
1187
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1188
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1189
        # write a short write to the file and ensure that its ignored
 
1190
        indexfile = file('test.kndx', 'ab')
 
1191
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
1192
        indexfile.close()
 
1193
        # we should be able to load this file again
 
1194
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
1195
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1196
        # and add a revision with the same id the failed write had
 
1197
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
1198
        # and when reading it revid3 should now appear.
 
1199
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1200
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
1201
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
1202
 
 
1203
    def test_delay_create(self):
 
1204
        """Test that passing delay_create=True creates files late"""
 
1205
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
1206
        self.failIfExists('test.knit')
 
1207
        self.failIfExists('test.kndx')
 
1208
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1209
        self.failUnlessExists('test.knit')
 
1210
        self.assertFileEqual(
 
1211
            "# bzr knit index 8\n"
 
1212
            "\n"
 
1213
            "revid fulltext 0 84 .a_ghost :",
 
1214
            'test.kndx')
 
1215
 
 
1216
    def test_create_parent_dir(self):
 
1217
        """create_parent_dir can create knits in nonexistant dirs"""
 
1218
        # Has no effect if we don't set 'delay_create'
 
1219
        trans = get_transport('.')
 
1220
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
1221
                          trans, access_mode='w', factory=None,
 
1222
                          create=True, create_parent_dir=True)
 
1223
        # Nothing should have changed yet
 
1224
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1225
                                 factory=None, create=True,
 
1226
                                 create_parent_dir=True,
 
1227
                                 delay_create=True)
 
1228
        self.failIfExists('dir/test.knit')
 
1229
        self.failIfExists('dir/test.kndx')
 
1230
        self.failIfExists('dir')
 
1231
        knit.add_lines('revid', [], ['a\n'])
 
1232
        self.failUnlessExists('dir')
 
1233
        self.failUnlessExists('dir/test.knit')
 
1234
        self.assertFileEqual(
 
1235
            "# bzr knit index 8\n"
 
1236
            "\n"
 
1237
            "revid fulltext 0 84  :",
 
1238
            'dir/test.kndx')
 
1239
 
 
1240
    def test_create_mode_700(self):
 
1241
        trans = get_transport('.')
 
1242
        if not trans._can_roundtrip_unix_modebits():
 
1243
            # Can't roundtrip, so no need to run this test
 
1244
            return
 
1245
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1246
                                 factory=None, create=True,
 
1247
                                 create_parent_dir=True,
 
1248
                                 delay_create=True,
 
1249
                                 file_mode=0600,
 
1250
                                 dir_mode=0700)
 
1251
        knit.add_lines('revid', [], ['a\n'])
 
1252
        self.assertTransportMode(trans, 'dir', 0700)
 
1253
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
1254
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
1255
 
 
1256
    def test_create_mode_770(self):
 
1257
        trans = get_transport('.')
 
1258
        if not trans._can_roundtrip_unix_modebits():
 
1259
            # Can't roundtrip, so no need to run this test
 
1260
            return
 
1261
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1262
                                 factory=None, create=True,
 
1263
                                 create_parent_dir=True,
 
1264
                                 delay_create=True,
 
1265
                                 file_mode=0660,
 
1266
                                 dir_mode=0770)
 
1267
        knit.add_lines('revid', [], ['a\n'])
 
1268
        self.assertTransportMode(trans, 'dir', 0770)
 
1269
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
1270
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
1271
 
 
1272
    def test_create_mode_777(self):
 
1273
        trans = get_transport('.')
 
1274
        if not trans._can_roundtrip_unix_modebits():
 
1275
            # Can't roundtrip, so no need to run this test
 
1276
            return
 
1277
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1278
                                 factory=None, create=True,
 
1279
                                 create_parent_dir=True,
 
1280
                                 delay_create=True,
 
1281
                                 file_mode=0666,
 
1282
                                 dir_mode=0777)
 
1283
        knit.add_lines('revid', [], ['a\n'])
 
1284
        self.assertTransportMode(trans, 'dir', 0777)
 
1285
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
1286
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
1287
 
 
1288
    def test_plan_merge(self):
 
1289
        my_knit = self.make_test_knit(annotate=True)
 
1290
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
1291
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
1292
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
1293
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
1294
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
1295
            self.assertEqual(plan_line, expected_line)
 
1296
 
 
1297
 
 
1298
TEXT_1 = """\
 
1299
Banana cup cakes:
 
1300
 
 
1301
- bananas
 
1302
- eggs
 
1303
- broken tea cups
 
1304
"""
 
1305
 
 
1306
TEXT_1A = """\
 
1307
Banana cup cake recipe
 
1308
(serves 6)
 
1309
 
 
1310
- bananas
 
1311
- eggs
 
1312
- broken tea cups
 
1313
- self-raising flour
 
1314
"""
 
1315
 
 
1316
TEXT_1B = """\
 
1317
Banana cup cake recipe
 
1318
 
 
1319
- bananas (do not use plantains!!!)
 
1320
- broken tea cups
 
1321
- flour
 
1322
"""
 
1323
 
 
1324
delta_1_1a = """\
 
1325
0,1,2
 
1326
Banana cup cake recipe
 
1327
(serves 6)
 
1328
5,5,1
 
1329
- self-raising flour
 
1330
"""
 
1331
 
 
1332
TEXT_2 = """\
 
1333
Boeuf bourguignon
 
1334
 
 
1335
- beef
 
1336
- red wine
 
1337
- small onions
 
1338
- carrot
 
1339
- mushrooms
 
1340
"""
 
1341
 
 
1342
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
1343
new-a|(serves 6)
 
1344
unchanged|
 
1345
killed-b|- bananas
 
1346
killed-b|- eggs
 
1347
new-b|- bananas (do not use plantains!!!)
 
1348
unchanged|- broken tea cups
 
1349
new-a|- self-raising flour
 
1350
new-b|- flour
 
1351
"""
 
1352
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
1353
 
 
1354
 
 
1355
def line_delta(from_lines, to_lines):
 
1356
    """Generate line-based delta from one text to another"""
 
1357
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
1358
    for op in s.get_opcodes():
 
1359
        if op[0] == 'equal':
 
1360
            continue
 
1361
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
1362
        for i in range(op[3], op[4]):
 
1363
            yield to_lines[i]
 
1364
 
 
1365
 
 
1366
def apply_line_delta(basis_lines, delta_lines):
 
1367
    """Apply a line-based perfect diff
 
1368
    
 
1369
    basis_lines -- text to apply the patch to
 
1370
    delta_lines -- diff instructions and content
 
1371
    """
 
1372
    out = basis_lines[:]
 
1373
    i = 0
 
1374
    offset = 0
 
1375
    while i < len(delta_lines):
 
1376
        l = delta_lines[i]
 
1377
        a, b, c = map(long, l.split(','))
 
1378
        i = i + 1
 
1379
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
1380
        i = i + c
 
1381
        offset = offset + (b - a) + c
 
1382
    return out
 
1383
 
 
1384
 
 
1385
class TestWeaveToKnit(KnitTests):
 
1386
 
 
1387
    def test_weave_to_knit_matches(self):
 
1388
        # check that the WeaveToKnit is_compatible function
 
1389
        # registers True for a Weave to a Knit.
 
1390
        w = Weave()
 
1391
        k = self.make_test_knit()
 
1392
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
1393
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
1394
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
1395
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
1396
 
 
1397
 
 
1398
class TestKnitCaching(KnitTests):
 
1399
    
 
1400
    def create_knit(self, cache_add=False):
 
1401
        k = self.make_test_knit(True)
 
1402
        if cache_add:
 
1403
            k.enable_cache()
 
1404
 
 
1405
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1406
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
1407
        return k
 
1408
 
 
1409
    def test_no_caching(self):
 
1410
        k = self.create_knit()
 
1411
        # Nothing should be cached without setting 'enable_cache'
 
1412
        self.assertEqual({}, k._data._cache)
 
1413
 
 
1414
    def test_cache_add_and_clear(self):
 
1415
        k = self.create_knit(True)
 
1416
 
 
1417
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
1418
 
 
1419
        k.clear_cache()
 
1420
        self.assertEqual({}, k._data._cache)
 
1421
 
 
1422
    def test_cache_data_read_raw(self):
 
1423
        k = self.create_knit()
 
1424
 
 
1425
        # Now cache and read
 
1426
        k.enable_cache()
 
1427
 
 
1428
        def read_one_raw(version):
 
1429
            pos_map = k._get_components_positions([version])
 
1430
            method, pos, size, next = pos_map[version]
 
1431
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
1432
            self.assertEqual(1, len(lst))
 
1433
            return lst[0]
 
1434
 
 
1435
        val = read_one_raw('text-1')
 
1436
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
1437
 
 
1438
        k.clear_cache()
 
1439
        # After clear, new reads are not cached
 
1440
        self.assertEqual({}, k._data._cache)
 
1441
 
 
1442
        val2 = read_one_raw('text-1')
 
1443
        self.assertEqual(val, val2)
 
1444
        self.assertEqual({}, k._data._cache)
 
1445
 
 
1446
    def test_cache_data_read(self):
 
1447
        k = self.create_knit()
 
1448
 
 
1449
        def read_one(version):
 
1450
            pos_map = k._get_components_positions([version])
 
1451
            method, pos, size, next = pos_map[version]
 
1452
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
1453
            self.assertEqual(1, len(lst))
 
1454
            return lst[0]
 
1455
 
 
1456
        # Now cache and read
 
1457
        k.enable_cache()
 
1458
 
 
1459
        val = read_one('text-2')
 
1460
        self.assertEqual(['text-2'], k._data._cache.keys())
 
1461
        self.assertEqual('text-2', val[0])
 
1462
        content, digest = k._data._parse_record('text-2',
 
1463
                                                k._data._cache['text-2'])
 
1464
        self.assertEqual(content, val[1])
 
1465
        self.assertEqual(digest, val[2])
 
1466
 
 
1467
        k.clear_cache()
 
1468
        self.assertEqual({}, k._data._cache)
 
1469
 
 
1470
        val2 = read_one('text-2')
 
1471
        self.assertEqual(val, val2)
 
1472
        self.assertEqual({}, k._data._cache)
 
1473
 
 
1474
    def test_cache_read(self):
 
1475
        k = self.create_knit()
 
1476
        k.enable_cache()
 
1477
 
 
1478
        text = k.get_text('text-1')
 
1479
        self.assertEqual(TEXT_1, text)
 
1480
        self.assertEqual(['text-1'], k._data._cache.keys())
 
1481
 
 
1482
        k.clear_cache()
 
1483
        self.assertEqual({}, k._data._cache)
 
1484
 
 
1485
        text = k.get_text('text-1')
 
1486
        self.assertEqual(TEXT_1, text)
 
1487
        self.assertEqual({}, k._data._cache)
 
1488
 
 
1489
 
 
1490
class TestKnitIndex(KnitTests):
 
1491
 
 
1492
    def test_add_versions_dictionary_compresses(self):
 
1493
        """Adding versions to the index should update the lookup dict"""
 
1494
        knit = self.make_test_knit()
 
1495
        idx = knit._index
 
1496
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1497
        self.check_file_contents('test.kndx',
 
1498
            '# bzr knit index 8\n'
 
1499
            '\n'
 
1500
            'a-1 fulltext 0 0  :'
 
1501
            )
 
1502
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
1503
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
1504
                         ])
 
1505
        self.check_file_contents('test.kndx',
 
1506
            '# bzr knit index 8\n'
 
1507
            '\n'
 
1508
            'a-1 fulltext 0 0  :\n'
 
1509
            'a-2 fulltext 0 0 0 :\n'
 
1510
            'a-3 fulltext 0 0 1 :'
 
1511
            )
 
1512
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
1513
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
1514
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
1515
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
1516
                         }, idx._cache)
 
1517
 
 
1518
    def test_add_versions_fails_clean(self):
 
1519
        """If add_versions fails in the middle, it restores a pristine state.
 
1520
 
 
1521
        Any modifications that are made to the index are reset if all versions
 
1522
        cannot be added.
 
1523
        """
 
1524
        # This cheats a little bit by passing in a generator which will
 
1525
        # raise an exception before the processing finishes
 
1526
        # Other possibilities would be to have an version with the wrong number
 
1527
        # of entries, or to make the backing transport unable to write any
 
1528
        # files.
 
1529
 
 
1530
        knit = self.make_test_knit()
 
1531
        idx = knit._index
 
1532
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
1533
 
 
1534
        class StopEarly(Exception):
 
1535
            pass
 
1536
 
 
1537
        def generate_failure():
 
1538
            """Add some entries and then raise an exception"""
 
1539
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
1540
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
1541
            raise StopEarly()
 
1542
 
 
1543
        # Assert the pre-condition
 
1544
        self.assertEqual(['a-1'], idx._history)
 
1545
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1546
 
 
1547
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
1548
 
 
1549
        # And it shouldn't be modified
 
1550
        self.assertEqual(['a-1'], idx._history)
 
1551
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1552
 
 
1553
    def test_knit_index_ignores_empty_files(self):
 
1554
        # There was a race condition in older bzr, where a ^C at the right time
 
1555
        # could leave an empty .kndx file, which bzr would later claim was a
 
1556
        # corrupted file since the header was not present. In reality, the file
 
1557
        # just wasn't created, so it should be ignored.
 
1558
        t = get_transport('.')
 
1559
        t.put_bytes('test.kndx', '')
 
1560
 
 
1561
        knit = self.make_test_knit()
 
1562
 
 
1563
    def test_knit_index_checks_header(self):
 
1564
        t = get_transport('.')
 
1565
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
 
1566
 
 
1567
        self.assertRaises(KnitHeaderError, self.make_test_knit)