/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Ian Clatworthy
  • Date: 2007-09-03 01:33:49 UTC
  • mto: (2779.1.1 ianc-integration)
  • mto: This revision was merged to the branch mainline in revision 2783.
  • Revision ID: ian.clatworthy@internode.on.net-20070903013349-o37wm1bxe04pctrm
Incorporate feedback from poolie

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
from cStringIO import StringIO
 
20
import difflib
 
21
import gzip
 
22
import sha
 
23
import sys
 
24
 
 
25
from bzrlib import (
 
26
    errors,
 
27
    generate_ids,
 
28
    knit,
 
29
    pack,
 
30
    )
 
31
from bzrlib.errors import (
 
32
    RevisionAlreadyPresent,
 
33
    KnitHeaderError,
 
34
    RevisionNotPresent,
 
35
    NoSuchFile,
 
36
    )
 
37
from bzrlib.index import *
 
38
from bzrlib.knit import (
 
39
    KnitContent,
 
40
    KnitGraphIndex,
 
41
    KnitVersionedFile,
 
42
    KnitPlainFactory,
 
43
    KnitAnnotateFactory,
 
44
    _KnitAccess,
 
45
    _KnitData,
 
46
    _KnitIndex,
 
47
    _PackAccess,
 
48
    WeaveToKnit,
 
49
    KnitSequenceMatcher,
 
50
    )
 
51
from bzrlib.osutils import split_lines
 
52
from bzrlib.tests import (
 
53
    Feature,
 
54
    TestCase,
 
55
    TestCaseWithMemoryTransport,
 
56
    TestCaseWithTransport,
 
57
    )
 
58
from bzrlib.transport import TransportLogger, get_transport
 
59
from bzrlib.transport.memory import MemoryTransport
 
60
from bzrlib.util import bencode
 
61
from bzrlib.weave import Weave
 
62
 
 
63
 
 
64
class _CompiledKnitFeature(Feature):
 
65
 
 
66
    def _probe(self):
 
67
        try:
 
68
            import bzrlib._knit_load_data_c
 
69
        except ImportError:
 
70
            return False
 
71
        return True
 
72
 
 
73
    def feature_name(self):
 
74
        return 'bzrlib._knit_load_data_c'
 
75
 
 
76
CompiledKnitFeature = _CompiledKnitFeature()
 
77
 
 
78
 
 
79
class KnitContentTests(TestCase):
 
80
 
 
81
    def test_constructor(self):
 
82
        content = KnitContent([])
 
83
 
 
84
    def test_text(self):
 
85
        content = KnitContent([])
 
86
        self.assertEqual(content.text(), [])
 
87
 
 
88
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
89
        self.assertEqual(content.text(), ["text1", "text2"])
 
90
 
 
91
    def test_annotate(self):
 
92
        content = KnitContent([])
 
93
        self.assertEqual(content.annotate(), [])
 
94
 
 
95
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
96
        self.assertEqual(content.annotate(),
 
97
            [("origin1", "text1"), ("origin2", "text2")])
 
98
 
 
99
    def test_annotate_iter(self):
 
100
        content = KnitContent([])
 
101
        it = content.annotate_iter()
 
102
        self.assertRaises(StopIteration, it.next)
 
103
 
 
104
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
105
        it = content.annotate_iter()
 
106
        self.assertEqual(it.next(), ("origin1", "text1"))
 
107
        self.assertEqual(it.next(), ("origin2", "text2"))
 
108
        self.assertRaises(StopIteration, it.next)
 
109
 
 
110
    def test_copy(self):
 
111
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
112
        copy = content.copy()
 
113
        self.assertIsInstance(copy, KnitContent)
 
114
        self.assertEqual(copy.annotate(),
 
115
            [("origin1", "text1"), ("origin2", "text2")])
 
116
 
 
117
    def test_line_delta(self):
 
118
        content1 = KnitContent([("", "a"), ("", "b")])
 
119
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
120
        self.assertEqual(content1.line_delta(content2),
 
121
            [(1, 2, 2, [("", "a"), ("", "c")])])
 
122
 
 
123
    def test_line_delta_iter(self):
 
124
        content1 = KnitContent([("", "a"), ("", "b")])
 
125
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
126
        it = content1.line_delta_iter(content2)
 
127
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
 
128
        self.assertRaises(StopIteration, it.next)
 
129
 
 
130
 
 
131
class MockTransport(object):
 
132
 
 
133
    def __init__(self, file_lines=None):
 
134
        self.file_lines = file_lines
 
135
        self.calls = []
 
136
        # We have no base directory for the MockTransport
 
137
        self.base = ''
 
138
 
 
139
    def get(self, filename):
 
140
        if self.file_lines is None:
 
141
            raise NoSuchFile(filename)
 
142
        else:
 
143
            return StringIO("\n".join(self.file_lines))
 
144
 
 
145
    def readv(self, relpath, offsets):
 
146
        fp = self.get(relpath)
 
147
        for offset, size in offsets:
 
148
            fp.seek(offset)
 
149
            yield offset, fp.read(size)
 
150
 
 
151
    def __getattr__(self, name):
 
152
        def queue_call(*args, **kwargs):
 
153
            self.calls.append((name, args, kwargs))
 
154
        return queue_call
 
155
 
 
156
 
 
157
class KnitRecordAccessTestsMixin(object):
 
158
    """Tests for getting and putting knit records."""
 
159
 
 
160
    def assertAccessExists(self, access):
 
161
        """Ensure the data area for access has been initialised/exists."""
 
162
        raise NotImplementedError(self.assertAccessExists)
 
163
 
 
164
    def test_add_raw_records(self):
 
165
        """Add_raw_records adds records retrievable later."""
 
166
        access = self.get_access()
 
167
        memos = access.add_raw_records([10], '1234567890')
 
168
        self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
 
169
 
 
170
    def test_add_several_raw_records(self):
 
171
        """add_raw_records with many records and read some back."""
 
172
        access = self.get_access()
 
173
        memos = access.add_raw_records([10, 2, 5], '12345678901234567')
 
174
        self.assertEqual(['1234567890', '12', '34567'],
 
175
            list(access.get_raw_records(memos)))
 
176
        self.assertEqual(['1234567890'],
 
177
            list(access.get_raw_records(memos[0:1])))
 
178
        self.assertEqual(['12'],
 
179
            list(access.get_raw_records(memos[1:2])))
 
180
        self.assertEqual(['34567'],
 
181
            list(access.get_raw_records(memos[2:3])))
 
182
        self.assertEqual(['1234567890', '34567'],
 
183
            list(access.get_raw_records(memos[0:1] + memos[2:3])))
 
184
 
 
185
    def test_create(self):
 
186
        """create() should make a file on disk."""
 
187
        access = self.get_access()
 
188
        access.create()
 
189
        self.assertAccessExists(access)
 
190
 
 
191
    def test_open_file(self):
 
192
        """open_file never errors."""
 
193
        access = self.get_access()
 
194
        access.open_file()
 
195
 
 
196
 
 
197
class TestKnitKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
 
198
    """Tests for the .kndx implementation."""
 
199
 
 
200
    def assertAccessExists(self, access):
 
201
        self.assertNotEqual(None, access.open_file())
 
202
 
 
203
    def get_access(self):
 
204
        """Get a .knit style access instance."""
 
205
        access = _KnitAccess(self.get_transport(), "foo.knit", None, None,
 
206
            False, False)
 
207
        return access
 
208
    
 
209
 
 
210
class TestPackKnitAccess(TestCaseWithMemoryTransport, KnitRecordAccessTestsMixin):
 
211
    """Tests for the pack based access."""
 
212
 
 
213
    def assertAccessExists(self, access):
 
214
        # as pack based access has no backing unless an index maps data, this
 
215
        # is a no-op.
 
216
        pass
 
217
 
 
218
    def get_access(self):
 
219
        return self._get_access()[0]
 
220
 
 
221
    def _get_access(self, packname='packfile', index='FOO'):
 
222
        transport = self.get_transport()
 
223
        def write_data(bytes):
 
224
            transport.append_bytes(packname, bytes)
 
225
        writer = pack.ContainerWriter(write_data)
 
226
        writer.begin()
 
227
        indices = {index:(transport, packname)}
 
228
        access = _PackAccess(indices, writer=(writer, index))
 
229
        return access, writer
 
230
 
 
231
    def test_read_from_several_packs(self):
 
232
        access, writer = self._get_access()
 
233
        memos = []
 
234
        memos.extend(access.add_raw_records([10], '1234567890'))
 
235
        writer.end()
 
236
        access, writer = self._get_access('pack2', 'FOOBAR')
 
237
        memos.extend(access.add_raw_records([5], '12345'))
 
238
        writer.end()
 
239
        access, writer = self._get_access('pack3', 'BAZ')
 
240
        memos.extend(access.add_raw_records([5], 'alpha'))
 
241
        writer.end()
 
242
        transport = self.get_transport()
 
243
        access = _PackAccess({"FOO":(transport, 'packfile'),
 
244
            "FOOBAR":(transport, 'pack2'),
 
245
            "BAZ":(transport, 'pack3')})
 
246
        self.assertEqual(['1234567890', '12345', 'alpha'],
 
247
            list(access.get_raw_records(memos)))
 
248
        self.assertEqual(['1234567890'],
 
249
            list(access.get_raw_records(memos[0:1])))
 
250
        self.assertEqual(['12345'],
 
251
            list(access.get_raw_records(memos[1:2])))
 
252
        self.assertEqual(['alpha'],
 
253
            list(access.get_raw_records(memos[2:3])))
 
254
        self.assertEqual(['1234567890', 'alpha'],
 
255
            list(access.get_raw_records(memos[0:1] + memos[2:3])))
 
256
 
 
257
    def test_set_writer(self):
 
258
        """The writer should be settable post construction."""
 
259
        access = _PackAccess({})
 
260
        transport = self.get_transport()
 
261
        packname = 'packfile'
 
262
        index = 'foo'
 
263
        def write_data(bytes):
 
264
            transport.append_bytes(packname, bytes)
 
265
        writer = pack.ContainerWriter(write_data)
 
266
        writer.begin()
 
267
        access.set_writer(writer, index, (transport, packname))
 
268
        memos = access.add_raw_records([10], '1234567890')
 
269
        writer.end()
 
270
        self.assertEqual(['1234567890'], list(access.get_raw_records(memos)))
 
271
 
 
272
 
 
273
class LowLevelKnitDataTests(TestCase):
 
274
 
 
275
    def create_gz_content(self, text):
 
276
        sio = StringIO()
 
277
        gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
 
278
        gz_file.write(text)
 
279
        gz_file.close()
 
280
        return sio.getvalue()
 
281
 
 
282
    def test_valid_knit_data(self):
 
283
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
284
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
285
                                        'foo\n'
 
286
                                        'bar\n'
 
287
                                        'end rev-id-1\n'
 
288
                                        % (sha1sum,))
 
289
        transport = MockTransport([gz_txt])
 
290
        access = _KnitAccess(transport, 'filename', None, None, False, False)
 
291
        data = _KnitData(access=access)
 
292
        records = [('rev-id-1', (None, 0, len(gz_txt)))]
 
293
 
 
294
        contents = data.read_records(records)
 
295
        self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
 
296
 
 
297
        raw_contents = list(data.read_records_iter_raw(records))
 
298
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
299
 
 
300
    def test_not_enough_lines(self):
 
301
        sha1sum = sha.new('foo\n').hexdigest()
 
302
        # record says 2 lines data says 1
 
303
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
304
                                        'foo\n'
 
305
                                        'end rev-id-1\n'
 
306
                                        % (sha1sum,))
 
307
        transport = MockTransport([gz_txt])
 
308
        access = _KnitAccess(transport, 'filename', None, None, False, False)
 
309
        data = _KnitData(access=access)
 
310
        records = [('rev-id-1', (None, 0, len(gz_txt)))]
 
311
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
312
 
 
313
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
314
        raw_contents = list(data.read_records_iter_raw(records))
 
315
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
316
 
 
317
    def test_too_many_lines(self):
 
318
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
319
        # record says 1 lines data says 2
 
320
        gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
 
321
                                        'foo\n'
 
322
                                        'bar\n'
 
323
                                        'end rev-id-1\n'
 
324
                                        % (sha1sum,))
 
325
        transport = MockTransport([gz_txt])
 
326
        access = _KnitAccess(transport, 'filename', None, None, False, False)
 
327
        data = _KnitData(access=access)
 
328
        records = [('rev-id-1', (None, 0, len(gz_txt)))]
 
329
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
330
 
 
331
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
332
        raw_contents = list(data.read_records_iter_raw(records))
 
333
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
334
 
 
335
    def test_mismatched_version_id(self):
 
336
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
337
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
338
                                        'foo\n'
 
339
                                        'bar\n'
 
340
                                        'end rev-id-1\n'
 
341
                                        % (sha1sum,))
 
342
        transport = MockTransport([gz_txt])
 
343
        access = _KnitAccess(transport, 'filename', None, None, False, False)
 
344
        data = _KnitData(access=access)
 
345
        # We are asking for rev-id-2, but the data is rev-id-1
 
346
        records = [('rev-id-2', (None, 0, len(gz_txt)))]
 
347
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
348
 
 
349
        # read_records_iter_raw will notice if we request the wrong version.
 
350
        self.assertRaises(errors.KnitCorrupt, list,
 
351
                          data.read_records_iter_raw(records))
 
352
 
 
353
    def test_uncompressed_data(self):
 
354
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
355
        txt = ('version rev-id-1 2 %s\n'
 
356
               'foo\n'
 
357
               'bar\n'
 
358
               'end rev-id-1\n'
 
359
               % (sha1sum,))
 
360
        transport = MockTransport([txt])
 
361
        access = _KnitAccess(transport, 'filename', None, None, False, False)
 
362
        data = _KnitData(access=access)
 
363
        records = [('rev-id-1', (None, 0, len(txt)))]
 
364
 
 
365
        # We don't have valid gzip data ==> corrupt
 
366
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
367
 
 
368
        # read_records_iter_raw will notice the bad data
 
369
        self.assertRaises(errors.KnitCorrupt, list,
 
370
                          data.read_records_iter_raw(records))
 
371
 
 
372
    def test_corrupted_data(self):
 
373
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
374
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
375
                                        'foo\n'
 
376
                                        'bar\n'
 
377
                                        'end rev-id-1\n'
 
378
                                        % (sha1sum,))
 
379
        # Change 2 bytes in the middle to \xff
 
380
        gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
 
381
        transport = MockTransport([gz_txt])
 
382
        access = _KnitAccess(transport, 'filename', None, None, False, False)
 
383
        data = _KnitData(access=access)
 
384
        records = [('rev-id-1', (None, 0, len(gz_txt)))]
 
385
 
 
386
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
387
 
 
388
        # read_records_iter_raw will notice if we request the wrong version.
 
389
        self.assertRaises(errors.KnitCorrupt, list,
 
390
                          data.read_records_iter_raw(records))
 
391
 
 
392
 
 
393
class LowLevelKnitIndexTests(TestCase):
 
394
 
 
395
    def get_knit_index(self, *args, **kwargs):
 
396
        orig = knit._load_data
 
397
        def reset():
 
398
            knit._load_data = orig
 
399
        self.addCleanup(reset)
 
400
        from bzrlib._knit_load_data_py import _load_data_py
 
401
        knit._load_data = _load_data_py
 
402
        return _KnitIndex(*args, **kwargs)
 
403
 
 
404
    def test_no_such_file(self):
 
405
        transport = MockTransport()
 
406
 
 
407
        self.assertRaises(NoSuchFile, self.get_knit_index,
 
408
                          transport, "filename", "r")
 
409
        self.assertRaises(NoSuchFile, self.get_knit_index,
 
410
                          transport, "filename", "w", create=False)
 
411
 
 
412
    def test_create_file(self):
 
413
        transport = MockTransport()
 
414
 
 
415
        index = self.get_knit_index(transport, "filename", "w",
 
416
            file_mode="wb", create=True)
 
417
        self.assertEqual(
 
418
                ("put_bytes_non_atomic",
 
419
                    ("filename", index.HEADER), {"mode": "wb"}),
 
420
                transport.calls.pop(0))
 
421
 
 
422
    def test_delay_create_file(self):
 
423
        transport = MockTransport()
 
424
 
 
425
        index = self.get_knit_index(transport, "filename", "w",
 
426
            create=True, file_mode="wb", create_parent_dir=True,
 
427
            delay_create=True, dir_mode=0777)
 
428
        self.assertEqual([], transport.calls)
 
429
 
 
430
        index.add_versions([])
 
431
        name, (filename, f), kwargs = transport.calls.pop(0)
 
432
        self.assertEqual("put_file_non_atomic", name)
 
433
        self.assertEqual(
 
434
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
435
            kwargs)
 
436
        self.assertEqual("filename", filename)
 
437
        self.assertEqual(index.HEADER, f.read())
 
438
 
 
439
        index.add_versions([])
 
440
        self.assertEqual(("append_bytes", ("filename", ""), {}),
 
441
            transport.calls.pop(0))
 
442
 
 
443
    def test_read_utf8_version_id(self):
 
444
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
445
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
446
        transport = MockTransport([
 
447
            _KnitIndex.HEADER,
 
448
            '%s option 0 1 :' % (utf8_revision_id,)
 
449
            ])
 
450
        index = self.get_knit_index(transport, "filename", "r")
 
451
        # _KnitIndex is a private class, and deals in utf8 revision_ids, not
 
452
        # Unicode revision_ids.
 
453
        self.assertTrue(index.has_version(utf8_revision_id))
 
454
        self.assertFalse(index.has_version(unicode_revision_id))
 
455
 
 
456
    def test_read_utf8_parents(self):
 
457
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
458
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
459
        transport = MockTransport([
 
460
            _KnitIndex.HEADER,
 
461
            "version option 0 1 .%s :" % (utf8_revision_id,)
 
462
            ])
 
463
        index = self.get_knit_index(transport, "filename", "r")
 
464
        self.assertEqual([utf8_revision_id],
 
465
            index.get_parents_with_ghosts("version"))
 
466
 
 
467
    def test_read_ignore_corrupted_lines(self):
 
468
        transport = MockTransport([
 
469
            _KnitIndex.HEADER,
 
470
            "corrupted",
 
471
            "corrupted options 0 1 .b .c ",
 
472
            "version options 0 1 :"
 
473
            ])
 
474
        index = self.get_knit_index(transport, "filename", "r")
 
475
        self.assertEqual(1, index.num_versions())
 
476
        self.assertTrue(index.has_version("version"))
 
477
 
 
478
    def test_read_corrupted_header(self):
 
479
        transport = MockTransport(['not a bzr knit index header\n'])
 
480
        self.assertRaises(KnitHeaderError,
 
481
            self.get_knit_index, transport, "filename", "r")
 
482
 
 
483
    def test_read_duplicate_entries(self):
 
484
        transport = MockTransport([
 
485
            _KnitIndex.HEADER,
 
486
            "parent options 0 1 :",
 
487
            "version options1 0 1 0 :",
 
488
            "version options2 1 2 .other :",
 
489
            "version options3 3 4 0 .other :"
 
490
            ])
 
491
        index = self.get_knit_index(transport, "filename", "r")
 
492
        self.assertEqual(2, index.num_versions())
 
493
        # check that the index used is the first one written. (Specific
 
494
        # to KnitIndex style indices.
 
495
        self.assertEqual("1", index._version_list_to_index(["version"]))
 
496
        self.assertEqual((None, 3, 4), index.get_position("version"))
 
497
        self.assertEqual(["options3"], index.get_options("version"))
 
498
        self.assertEqual(["parent", "other"],
 
499
            index.get_parents_with_ghosts("version"))
 
500
 
 
501
    def test_read_compressed_parents(self):
 
502
        transport = MockTransport([
 
503
            _KnitIndex.HEADER,
 
504
            "a option 0 1 :",
 
505
            "b option 0 1 0 :",
 
506
            "c option 0 1 1 0 :",
 
507
            ])
 
508
        index = self.get_knit_index(transport, "filename", "r")
 
509
        self.assertEqual(["a"], index.get_parents("b"))
 
510
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
511
 
 
512
    def test_write_utf8_version_id(self):
 
513
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
514
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
515
        transport = MockTransport([
 
516
            _KnitIndex.HEADER
 
517
            ])
 
518
        index = self.get_knit_index(transport, "filename", "r")
 
519
        index.add_version(utf8_revision_id, ["option"], (None, 0, 1), [])
 
520
        self.assertEqual(("append_bytes", ("filename",
 
521
            "\n%s option 0 1  :" % (utf8_revision_id,)),
 
522
            {}),
 
523
            transport.calls.pop(0))
 
524
 
 
525
    def test_write_utf8_parents(self):
 
526
        unicode_revision_id = u"version-\N{CYRILLIC CAPITAL LETTER A}"
 
527
        utf8_revision_id = unicode_revision_id.encode('utf-8')
 
528
        transport = MockTransport([
 
529
            _KnitIndex.HEADER
 
530
            ])
 
531
        index = self.get_knit_index(transport, "filename", "r")
 
532
        index.add_version("version", ["option"], (None, 0, 1), [utf8_revision_id])
 
533
        self.assertEqual(("append_bytes", ("filename",
 
534
            "\nversion option 0 1 .%s :" % (utf8_revision_id,)),
 
535
            {}),
 
536
            transport.calls.pop(0))
 
537
 
 
538
    def test_get_graph(self):
 
539
        transport = MockTransport()
 
540
        index = self.get_knit_index(transport, "filename", "w", create=True)
 
541
        self.assertEqual([], index.get_graph())
 
542
 
 
543
        index.add_version("a", ["option"], (None, 0, 1), ["b"])
 
544
        self.assertEqual([("a", ["b"])], index.get_graph())
 
545
 
 
546
        index.add_version("c", ["option"], (None, 0, 1), ["d"])
 
547
        self.assertEqual([("a", ["b"]), ("c", ["d"])],
 
548
            sorted(index.get_graph()))
 
549
 
 
550
    def test_get_ancestry(self):
 
551
        transport = MockTransport([
 
552
            _KnitIndex.HEADER,
 
553
            "a option 0 1 :",
 
554
            "b option 0 1 0 .e :",
 
555
            "c option 0 1 1 0 :",
 
556
            "d option 0 1 2 .f :"
 
557
            ])
 
558
        index = self.get_knit_index(transport, "filename", "r")
 
559
 
 
560
        self.assertEqual([], index.get_ancestry([]))
 
561
        self.assertEqual(["a"], index.get_ancestry(["a"]))
 
562
        self.assertEqual(["a", "b"], index.get_ancestry(["b"]))
 
563
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["c"]))
 
564
        self.assertEqual(["a", "b", "c", "d"], index.get_ancestry(["d"]))
 
565
        self.assertEqual(["a", "b"], index.get_ancestry(["a", "b"]))
 
566
        self.assertEqual(["a", "b", "c"], index.get_ancestry(["a", "c"]))
 
567
 
 
568
        self.assertRaises(RevisionNotPresent, index.get_ancestry, ["e"])
 
569
 
 
570
    def test_get_ancestry_with_ghosts(self):
 
571
        transport = MockTransport([
 
572
            _KnitIndex.HEADER,
 
573
            "a option 0 1 :",
 
574
            "b option 0 1 0 .e :",
 
575
            "c option 0 1 0 .f .g :",
 
576
            "d option 0 1 2 .h .j .k :"
 
577
            ])
 
578
        index = self.get_knit_index(transport, "filename", "r")
 
579
 
 
580
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
581
        self.assertEqual(["a"], index.get_ancestry_with_ghosts(["a"]))
 
582
        self.assertEqual(["a", "e", "b"],
 
583
            index.get_ancestry_with_ghosts(["b"]))
 
584
        self.assertEqual(["a", "g", "f", "c"],
 
585
            index.get_ancestry_with_ghosts(["c"]))
 
586
        self.assertEqual(["a", "g", "f", "c", "k", "j", "h", "d"],
 
587
            index.get_ancestry_with_ghosts(["d"]))
 
588
        self.assertEqual(["a", "e", "b"],
 
589
            index.get_ancestry_with_ghosts(["a", "b"]))
 
590
        self.assertEqual(["a", "g", "f", "c"],
 
591
            index.get_ancestry_with_ghosts(["a", "c"]))
 
592
        self.assertEqual(
 
593
            ["a", "g", "f", "c", "e", "b", "k", "j", "h", "d"],
 
594
            index.get_ancestry_with_ghosts(["b", "d"]))
 
595
 
 
596
        self.assertRaises(RevisionNotPresent,
 
597
            index.get_ancestry_with_ghosts, ["e"])
 
598
 
 
599
    def test_iter_parents(self):
 
600
        transport = MockTransport()
 
601
        index = self.get_knit_index(transport, "filename", "w", create=True)
 
602
        # no parents
 
603
        index.add_version('r0', ['option'], (None, 0, 1), [])
 
604
        # 1 parent
 
605
        index.add_version('r1', ['option'], (None, 0, 1), ['r0'])
 
606
        # 2 parents
 
607
        index.add_version('r2', ['option'], (None, 0, 1), ['r1', 'r0'])
 
608
        # XXX TODO a ghost
 
609
        # cases: each sample data individually:
 
610
        self.assertEqual(set([('r0', ())]),
 
611
            set(index.iter_parents(['r0'])))
 
612
        self.assertEqual(set([('r1', ('r0', ))]),
 
613
            set(index.iter_parents(['r1'])))
 
614
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
 
615
            set(index.iter_parents(['r2'])))
 
616
        # no nodes returned for a missing node
 
617
        self.assertEqual(set(),
 
618
            set(index.iter_parents(['missing'])))
 
619
        # 1 node returned with missing nodes skipped
 
620
        self.assertEqual(set([('r1', ('r0', ))]),
 
621
            set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
 
622
        # 2 nodes returned
 
623
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
624
            set(index.iter_parents(['r0', 'r1'])))
 
625
        # 2 nodes returned, missing skipped
 
626
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
627
            set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
 
628
 
 
629
    def test_num_versions(self):
 
630
        transport = MockTransport([
 
631
            _KnitIndex.HEADER
 
632
            ])
 
633
        index = self.get_knit_index(transport, "filename", "r")
 
634
 
 
635
        self.assertEqual(0, index.num_versions())
 
636
        self.assertEqual(0, len(index))
 
637
 
 
638
        index.add_version("a", ["option"], (None, 0, 1), [])
 
639
        self.assertEqual(1, index.num_versions())
 
640
        self.assertEqual(1, len(index))
 
641
 
 
642
        index.add_version("a", ["option2"], (None, 1, 2), [])
 
643
        self.assertEqual(1, index.num_versions())
 
644
        self.assertEqual(1, len(index))
 
645
 
 
646
        index.add_version("b", ["option"], (None, 0, 1), [])
 
647
        self.assertEqual(2, index.num_versions())
 
648
        self.assertEqual(2, len(index))
 
649
 
 
650
    def test_get_versions(self):
 
651
        transport = MockTransport([
 
652
            _KnitIndex.HEADER
 
653
            ])
 
654
        index = self.get_knit_index(transport, "filename", "r")
 
655
 
 
656
        self.assertEqual([], index.get_versions())
 
657
 
 
658
        index.add_version("a", ["option"], (None, 0, 1), [])
 
659
        self.assertEqual(["a"], index.get_versions())
 
660
 
 
661
        index.add_version("a", ["option"], (None, 0, 1), [])
 
662
        self.assertEqual(["a"], index.get_versions())
 
663
 
 
664
        index.add_version("b", ["option"], (None, 0, 1), [])
 
665
        self.assertEqual(["a", "b"], index.get_versions())
 
666
 
 
667
    def test_add_version(self):
 
668
        transport = MockTransport([
 
669
            _KnitIndex.HEADER
 
670
            ])
 
671
        index = self.get_knit_index(transport, "filename", "r")
 
672
 
 
673
        index.add_version("a", ["option"], (None, 0, 1), ["b"])
 
674
        self.assertEqual(("append_bytes",
 
675
            ("filename", "\na option 0 1 .b :"),
 
676
            {}), transport.calls.pop(0))
 
677
        self.assertTrue(index.has_version("a"))
 
678
        self.assertEqual(1, index.num_versions())
 
679
        self.assertEqual((None, 0, 1), index.get_position("a"))
 
680
        self.assertEqual(["option"], index.get_options("a"))
 
681
        self.assertEqual(["b"], index.get_parents_with_ghosts("a"))
 
682
 
 
683
        index.add_version("a", ["opt"], (None, 1, 2), ["c"])
 
684
        self.assertEqual(("append_bytes",
 
685
            ("filename", "\na opt 1 2 .c :"),
 
686
            {}), transport.calls.pop(0))
 
687
        self.assertTrue(index.has_version("a"))
 
688
        self.assertEqual(1, index.num_versions())
 
689
        self.assertEqual((None, 1, 2), index.get_position("a"))
 
690
        self.assertEqual(["opt"], index.get_options("a"))
 
691
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
692
 
 
693
        index.add_version("b", ["option"], (None, 2, 3), ["a"])
 
694
        self.assertEqual(("append_bytes",
 
695
            ("filename", "\nb option 2 3 0 :"),
 
696
            {}), transport.calls.pop(0))
 
697
        self.assertTrue(index.has_version("b"))
 
698
        self.assertEqual(2, index.num_versions())
 
699
        self.assertEqual((None, 2, 3), index.get_position("b"))
 
700
        self.assertEqual(["option"], index.get_options("b"))
 
701
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
702
 
 
703
    def test_add_versions(self):
 
704
        transport = MockTransport([
 
705
            _KnitIndex.HEADER
 
706
            ])
 
707
        index = self.get_knit_index(transport, "filename", "r")
 
708
 
 
709
        index.add_versions([
 
710
            ("a", ["option"], (None, 0, 1), ["b"]),
 
711
            ("a", ["opt"], (None, 1, 2), ["c"]),
 
712
            ("b", ["option"], (None, 2, 3), ["a"])
 
713
            ])
 
714
        self.assertEqual(("append_bytes", ("filename",
 
715
            "\na option 0 1 .b :"
 
716
            "\na opt 1 2 .c :"
 
717
            "\nb option 2 3 0 :"
 
718
            ), {}), transport.calls.pop(0))
 
719
        self.assertTrue(index.has_version("a"))
 
720
        self.assertTrue(index.has_version("b"))
 
721
        self.assertEqual(2, index.num_versions())
 
722
        self.assertEqual((None, 1, 2), index.get_position("a"))
 
723
        self.assertEqual((None, 2, 3), index.get_position("b"))
 
724
        self.assertEqual(["opt"], index.get_options("a"))
 
725
        self.assertEqual(["option"], index.get_options("b"))
 
726
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
 
727
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
 
728
 
 
729
    def test_delay_create_and_add_versions(self):
 
730
        transport = MockTransport()
 
731
 
 
732
        index = self.get_knit_index(transport, "filename", "w",
 
733
            create=True, file_mode="wb", create_parent_dir=True,
 
734
            delay_create=True, dir_mode=0777)
 
735
        self.assertEqual([], transport.calls)
 
736
 
 
737
        index.add_versions([
 
738
            ("a", ["option"], (None, 0, 1), ["b"]),
 
739
            ("a", ["opt"], (None, 1, 2), ["c"]),
 
740
            ("b", ["option"], (None, 2, 3), ["a"])
 
741
            ])
 
742
        name, (filename, f), kwargs = transport.calls.pop(0)
 
743
        self.assertEqual("put_file_non_atomic", name)
 
744
        self.assertEqual(
 
745
            {"dir_mode": 0777, "create_parent_dir": True, "mode": "wb"},
 
746
            kwargs)
 
747
        self.assertEqual("filename", filename)
 
748
        self.assertEqual(
 
749
            index.HEADER +
 
750
            "\na option 0 1 .b :"
 
751
            "\na opt 1 2 .c :"
 
752
            "\nb option 2 3 0 :",
 
753
            f.read())
 
754
 
 
755
    def test_has_version(self):
 
756
        transport = MockTransport([
 
757
            _KnitIndex.HEADER,
 
758
            "a option 0 1 :"
 
759
            ])
 
760
        index = self.get_knit_index(transport, "filename", "r")
 
761
 
 
762
        self.assertTrue(index.has_version("a"))
 
763
        self.assertFalse(index.has_version("b"))
 
764
 
 
765
    def test_get_position(self):
 
766
        transport = MockTransport([
 
767
            _KnitIndex.HEADER,
 
768
            "a option 0 1 :",
 
769
            "b option 1 2 :"
 
770
            ])
 
771
        index = self.get_knit_index(transport, "filename", "r")
 
772
 
 
773
        self.assertEqual((None, 0, 1), index.get_position("a"))
 
774
        self.assertEqual((None, 1, 2), index.get_position("b"))
 
775
 
 
776
    def test_get_method(self):
 
777
        transport = MockTransport([
 
778
            _KnitIndex.HEADER,
 
779
            "a fulltext,unknown 0 1 :",
 
780
            "b unknown,line-delta 1 2 :",
 
781
            "c bad 3 4 :"
 
782
            ])
 
783
        index = self.get_knit_index(transport, "filename", "r")
 
784
 
 
785
        self.assertEqual("fulltext", index.get_method("a"))
 
786
        self.assertEqual("line-delta", index.get_method("b"))
 
787
        self.assertRaises(errors.KnitIndexUnknownMethod, index.get_method, "c")
 
788
 
 
789
    def test_get_options(self):
 
790
        transport = MockTransport([
 
791
            _KnitIndex.HEADER,
 
792
            "a opt1 0 1 :",
 
793
            "b opt2,opt3 1 2 :"
 
794
            ])
 
795
        index = self.get_knit_index(transport, "filename", "r")
 
796
 
 
797
        self.assertEqual(["opt1"], index.get_options("a"))
 
798
        self.assertEqual(["opt2", "opt3"], index.get_options("b"))
 
799
 
 
800
    def test_get_parents(self):
 
801
        transport = MockTransport([
 
802
            _KnitIndex.HEADER,
 
803
            "a option 0 1 :",
 
804
            "b option 1 2 0 .c :",
 
805
            "c option 1 2 1 0 .e :"
 
806
            ])
 
807
        index = self.get_knit_index(transport, "filename", "r")
 
808
 
 
809
        self.assertEqual([], index.get_parents("a"))
 
810
        self.assertEqual(["a", "c"], index.get_parents("b"))
 
811
        self.assertEqual(["b", "a"], index.get_parents("c"))
 
812
 
 
813
    def test_get_parents_with_ghosts(self):
 
814
        transport = MockTransport([
 
815
            _KnitIndex.HEADER,
 
816
            "a option 0 1 :",
 
817
            "b option 1 2 0 .c :",
 
818
            "c option 1 2 1 0 .e :"
 
819
            ])
 
820
        index = self.get_knit_index(transport, "filename", "r")
 
821
 
 
822
        self.assertEqual([], index.get_parents_with_ghosts("a"))
 
823
        self.assertEqual(["a", "c"], index.get_parents_with_ghosts("b"))
 
824
        self.assertEqual(["b", "a", "e"],
 
825
            index.get_parents_with_ghosts("c"))
 
826
 
 
827
    def test_check_versions_present(self):
 
828
        transport = MockTransport([
 
829
            _KnitIndex.HEADER,
 
830
            "a option 0 1 :",
 
831
            "b option 0 1 :"
 
832
            ])
 
833
        index = self.get_knit_index(transport, "filename", "r")
 
834
 
 
835
        check = index.check_versions_present
 
836
 
 
837
        check([])
 
838
        check(["a"])
 
839
        check(["b"])
 
840
        check(["a", "b"])
 
841
        self.assertRaises(RevisionNotPresent, check, ["c"])
 
842
        self.assertRaises(RevisionNotPresent, check, ["a", "b", "c"])
 
843
 
 
844
    def test_impossible_parent(self):
 
845
        """Test we get KnitCorrupt if the parent couldn't possibly exist."""
 
846
        transport = MockTransport([
 
847
            _KnitIndex.HEADER,
 
848
            "a option 0 1 :",
 
849
            "b option 0 1 4 :"  # We don't have a 4th record
 
850
            ])
 
851
        try:
 
852
            self.assertRaises(errors.KnitCorrupt,
 
853
                              self.get_knit_index, transport, 'filename', 'r')
 
854
        except TypeError, e:
 
855
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
856
                           ' not exceptions.IndexError')
 
857
                and sys.version_info[0:2] >= (2,5)):
 
858
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
859
                                  ' raising new style exceptions with python'
 
860
                                  ' >=2.5')
 
861
            else:
 
862
                raise
 
863
 
 
864
    def test_corrupted_parent(self):
 
865
        transport = MockTransport([
 
866
            _KnitIndex.HEADER,
 
867
            "a option 0 1 :",
 
868
            "b option 0 1 :",
 
869
            "c option 0 1 1v :", # Can't have a parent of '1v'
 
870
            ])
 
871
        try:
 
872
            self.assertRaises(errors.KnitCorrupt,
 
873
                              self.get_knit_index, transport, 'filename', 'r')
 
874
        except TypeError, e:
 
875
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
876
                           ' not exceptions.ValueError')
 
877
                and sys.version_info[0:2] >= (2,5)):
 
878
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
879
                                  ' raising new style exceptions with python'
 
880
                                  ' >=2.5')
 
881
            else:
 
882
                raise
 
883
 
 
884
    def test_corrupted_parent_in_list(self):
 
885
        transport = MockTransport([
 
886
            _KnitIndex.HEADER,
 
887
            "a option 0 1 :",
 
888
            "b option 0 1 :",
 
889
            "c option 0 1 1 v :", # Can't have a parent of 'v'
 
890
            ])
 
891
        try:
 
892
            self.assertRaises(errors.KnitCorrupt,
 
893
                              self.get_knit_index, transport, 'filename', 'r')
 
894
        except TypeError, e:
 
895
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
896
                           ' not exceptions.ValueError')
 
897
                and sys.version_info[0:2] >= (2,5)):
 
898
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
899
                                  ' raising new style exceptions with python'
 
900
                                  ' >=2.5')
 
901
            else:
 
902
                raise
 
903
 
 
904
    def test_invalid_position(self):
 
905
        transport = MockTransport([
 
906
            _KnitIndex.HEADER,
 
907
            "a option 1v 1 :",
 
908
            ])
 
909
        try:
 
910
            self.assertRaises(errors.KnitCorrupt,
 
911
                              self.get_knit_index, transport, 'filename', 'r')
 
912
        except TypeError, e:
 
913
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
914
                           ' not exceptions.ValueError')
 
915
                and sys.version_info[0:2] >= (2,5)):
 
916
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
917
                                  ' raising new style exceptions with python'
 
918
                                  ' >=2.5')
 
919
            else:
 
920
                raise
 
921
 
 
922
    def test_invalid_size(self):
 
923
        transport = MockTransport([
 
924
            _KnitIndex.HEADER,
 
925
            "a option 1 1v :",
 
926
            ])
 
927
        try:
 
928
            self.assertRaises(errors.KnitCorrupt,
 
929
                              self.get_knit_index, transport, 'filename', 'r')
 
930
        except TypeError, e:
 
931
            if (str(e) == ('exceptions must be strings, classes, or instances,'
 
932
                           ' not exceptions.ValueError')
 
933
                and sys.version_info[0:2] >= (2,5)):
 
934
                self.knownFailure('Pyrex <0.9.5 fails with TypeError when'
 
935
                                  ' raising new style exceptions with python'
 
936
                                  ' >=2.5')
 
937
            else:
 
938
                raise
 
939
 
 
940
    def test_short_line(self):
 
941
        transport = MockTransport([
 
942
            _KnitIndex.HEADER,
 
943
            "a option 0 10  :",
 
944
            "b option 10 10 0", # This line isn't terminated, ignored
 
945
            ])
 
946
        index = self.get_knit_index(transport, "filename", "r")
 
947
        self.assertEqual(['a'], index.get_versions())
 
948
 
 
949
    def test_skip_incomplete_record(self):
 
950
        # A line with bogus data should just be skipped
 
951
        transport = MockTransport([
 
952
            _KnitIndex.HEADER,
 
953
            "a option 0 10  :",
 
954
            "b option 10 10 0", # This line isn't terminated, ignored
 
955
            "c option 20 10 0 :", # Properly terminated, and starts with '\n'
 
956
            ])
 
957
        index = self.get_knit_index(transport, "filename", "r")
 
958
        self.assertEqual(['a', 'c'], index.get_versions())
 
959
 
 
960
    def test_trailing_characters(self):
 
961
        # A line with bogus data should just be skipped
 
962
        transport = MockTransport([
 
963
            _KnitIndex.HEADER,
 
964
            "a option 0 10  :",
 
965
            "b option 10 10 0 :a", # This line has extra trailing characters
 
966
            "c option 20 10 0 :", # Properly terminated, and starts with '\n'
 
967
            ])
 
968
        index = self.get_knit_index(transport, "filename", "r")
 
969
        self.assertEqual(['a', 'c'], index.get_versions())
 
970
 
 
971
 
 
972
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
 
973
 
 
974
    _test_needs_features = [CompiledKnitFeature]
 
975
 
 
976
    def get_knit_index(self, *args, **kwargs):
 
977
        orig = knit._load_data
 
978
        def reset():
 
979
            knit._load_data = orig
 
980
        self.addCleanup(reset)
 
981
        from bzrlib._knit_load_data_c import _load_data_c
 
982
        knit._load_data = _load_data_c
 
983
        return _KnitIndex(*args, **kwargs)
 
984
 
 
985
 
 
986
 
 
987
class KnitTests(TestCaseWithTransport):
 
988
    """Class containing knit test helper routines."""
 
989
 
 
990
    def make_test_knit(self, annotate=False, delay_create=False, index=None,
 
991
                       name='test'):
 
992
        if not annotate:
 
993
            factory = KnitPlainFactory()
 
994
        else:
 
995
            factory = None
 
996
        return KnitVersionedFile(name, get_transport('.'), access_mode='w',
 
997
                                 factory=factory, create=True,
 
998
                                 delay_create=delay_create, index=index)
 
999
 
 
1000
    def assertRecordContentEqual(self, knit, version_id, candidate_content):
 
1001
        """Assert that some raw record content matches the raw record content
 
1002
        for a particular version_id in the given knit.
 
1003
        """
 
1004
        index_memo = knit._index.get_position(version_id)
 
1005
        record = (version_id, index_memo)
 
1006
        [(_, expected_content)] = list(knit._data.read_records_iter_raw([record]))
 
1007
        self.assertEqual(expected_content, candidate_content)
 
1008
 
 
1009
 
 
1010
class BasicKnitTests(KnitTests):
 
1011
 
 
1012
    def add_stock_one_and_one_a(self, k):
 
1013
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1014
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
1015
 
 
1016
    def test_knit_constructor(self):
 
1017
        """Construct empty k"""
 
1018
        self.make_test_knit()
 
1019
 
 
1020
    def test_make_explicit_index(self):
 
1021
        """We can supply an index to use."""
 
1022
        knit = KnitVersionedFile('test', get_transport('.'),
 
1023
            index='strangelove')
 
1024
        self.assertEqual(knit._index, 'strangelove')
 
1025
 
 
1026
    def test_knit_add(self):
 
1027
        """Store one text in knit and retrieve"""
 
1028
        k = self.make_test_knit()
 
1029
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1030
        self.assertTrue(k.has_version('text-1'))
 
1031
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
1032
 
 
1033
    def test_knit_reload(self):
 
1034
        # test that the content in a reloaded knit is correct
 
1035
        k = self.make_test_knit()
 
1036
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1037
        del k
 
1038
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
1039
        self.assertTrue(k2.has_version('text-1'))
 
1040
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
1041
 
 
1042
    def test_knit_several(self):
 
1043
        """Store several texts in a knit"""
 
1044
        k = self.make_test_knit()
 
1045
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1046
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
1047
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
1048
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
1049
        
 
1050
    def test_repeated_add(self):
 
1051
        """Knit traps attempt to replace existing version"""
 
1052
        k = self.make_test_knit()
 
1053
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1054
        self.assertRaises(RevisionAlreadyPresent, 
 
1055
                k.add_lines,
 
1056
                'text-1', [], split_lines(TEXT_1))
 
1057
 
 
1058
    def test_empty(self):
 
1059
        k = self.make_test_knit(True)
 
1060
        k.add_lines('text-1', [], [])
 
1061
        self.assertEquals(k.get_lines('text-1'), [])
 
1062
 
 
1063
    def test_incomplete(self):
 
1064
        """Test if texts without a ending line-end can be inserted and
 
1065
        extracted."""
 
1066
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
1067
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
1068
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
1069
        # reopening ensures maximum room for confusion
 
1070
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
1071
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
1072
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
1073
 
 
1074
    def test_delta(self):
 
1075
        """Expression of knit delta as lines"""
 
1076
        k = self.make_test_knit()
 
1077
        KnitContent
 
1078
        td = list(line_delta(TEXT_1.splitlines(True),
 
1079
                             TEXT_1A.splitlines(True)))
 
1080
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
1081
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
1082
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
1083
 
 
1084
    def assertDerivedBlocksEqual(self, source, target, noeol=False):
 
1085
        """Assert that the derived matching blocks match real output"""
 
1086
        source_lines = source.splitlines(True)
 
1087
        target_lines = target.splitlines(True)
 
1088
        def nl(line):
 
1089
            if noeol and not line.endswith('\n'):
 
1090
                return line + '\n'
 
1091
            else:
 
1092
                return line
 
1093
        source_content = KnitContent([(None, nl(l)) for l in source_lines])
 
1094
        target_content = KnitContent([(None, nl(l)) for l in target_lines])
 
1095
        line_delta = source_content.line_delta(target_content)
 
1096
        delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
 
1097
            source_lines, target_lines))
 
1098
        matcher = KnitSequenceMatcher(None, source_lines, target_lines)
 
1099
        matcher_blocks = list(list(matcher.get_matching_blocks()))
 
1100
        self.assertEqual(matcher_blocks, delta_blocks)
 
1101
 
 
1102
    def test_get_line_delta_blocks(self):
 
1103
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
 
1104
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
 
1105
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
 
1106
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
 
1107
        self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
 
1108
        self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
 
1109
        self.assertDerivedBlocksEqual(TEXT_1A, '')
 
1110
        self.assertDerivedBlocksEqual('', TEXT_1A)
 
1111
        self.assertDerivedBlocksEqual('', '')
 
1112
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
 
1113
 
 
1114
    def test_get_line_delta_blocks_noeol(self):
 
1115
        """Handle historical knit deltas safely
 
1116
 
 
1117
        Some existing knit deltas don't consider the last line to differ
 
1118
        when the only difference whether it has a final newline.
 
1119
 
 
1120
        New knit deltas appear to always consider the last line to differ
 
1121
        in this case.
 
1122
        """
 
1123
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
 
1124
        self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
 
1125
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
 
1126
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
 
1127
 
 
1128
    def test_add_with_parents(self):
 
1129
        """Store in knit with parents"""
 
1130
        k = self.make_test_knit()
 
1131
        self.add_stock_one_and_one_a(k)
 
1132
        self.assertEquals(k.get_parents('text-1'), [])
 
1133
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
1134
 
 
1135
    def test_ancestry(self):
 
1136
        """Store in knit with parents"""
 
1137
        k = self.make_test_knit()
 
1138
        self.add_stock_one_and_one_a(k)
 
1139
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
1140
 
 
1141
    def test_add_delta(self):
 
1142
        """Store in knit with parents"""
 
1143
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
1144
            delta=True, create=True)
 
1145
        self.add_stock_one_and_one_a(k)
 
1146
        k.clear_cache()
 
1147
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
1148
 
 
1149
    def test_add_delta_knit_graph_index(self):
 
1150
        """Does adding work with a KnitGraphIndex."""
 
1151
        index = InMemoryGraphIndex(2)
 
1152
        knit_index = KnitGraphIndex(index, add_callback=index.add_nodes,
 
1153
            deltas=True)
 
1154
        k = KnitVersionedFile('test', get_transport('.'),
 
1155
            delta=True, create=True, index=knit_index)
 
1156
        self.add_stock_one_and_one_a(k)
 
1157
        k.clear_cache()
 
1158
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
1159
        # check the index had the right data added.
 
1160
        self.assertEqual(set([
 
1161
            (index, ('text-1', ), ' 0 127', ((), ())),
 
1162
            (index, ('text-1a', ), ' 127 140', ((('text-1', ),), (('text-1', ),))),
 
1163
            ]), set(index.iter_all_entries()))
 
1164
        # we should not have a .kndx file
 
1165
        self.assertFalse(get_transport('.').has('test.kndx'))
 
1166
 
 
1167
    def test_annotate(self):
 
1168
        """Annotations"""
 
1169
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
1170
            delta=True, create=True)
 
1171
        self.insert_and_test_small_annotate(k)
 
1172
 
 
1173
    def insert_and_test_small_annotate(self, k):
 
1174
        """test annotation with k works correctly."""
 
1175
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
1176
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
1177
 
 
1178
        origins = k.annotate('text-2')
 
1179
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
1180
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
1181
 
 
1182
    def test_annotate_fulltext(self):
 
1183
        """Annotations"""
 
1184
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
1185
            delta=False, create=True)
 
1186
        self.insert_and_test_small_annotate(k)
 
1187
 
 
1188
    def test_annotate_merge_1(self):
 
1189
        k = self.make_test_knit(True)
 
1190
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
1191
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
1192
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
1193
        origins = k.annotate('text-am')
 
1194
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
1195
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
1196
 
 
1197
    def test_annotate_merge_2(self):
 
1198
        k = self.make_test_knit(True)
 
1199
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1200
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1201
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
1202
        origins = k.annotate('text-am')
 
1203
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1204
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1205
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
1206
 
 
1207
    def test_annotate_merge_9(self):
 
1208
        k = self.make_test_knit(True)
 
1209
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1210
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1211
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
1212
        origins = k.annotate('text-am')
 
1213
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
1214
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1215
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
1216
 
 
1217
    def test_annotate_merge_3(self):
 
1218
        k = self.make_test_knit(True)
 
1219
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1220
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
1221
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
1222
        origins = k.annotate('text-am')
 
1223
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
1224
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
1225
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
1226
 
 
1227
    def test_annotate_merge_4(self):
 
1228
        k = self.make_test_knit(True)
 
1229
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1230
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
1231
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
1232
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
1233
        origins = k.annotate('text-am')
 
1234
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1235
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
1236
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
1237
 
 
1238
    def test_annotate_merge_5(self):
 
1239
        k = self.make_test_knit(True)
 
1240
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
1241
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
1242
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
1243
        k.add_lines('text-am',
 
1244
                    ['text-a1', 'text-a2', 'text-a3'],
 
1245
                    ['a\n', 'e\n', 'z\n'])
 
1246
        origins = k.annotate('text-am')
 
1247
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
1248
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
1249
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
1250
 
 
1251
    def test_annotate_file_cherry_pick(self):
 
1252
        k = self.make_test_knit(True)
 
1253
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
1254
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
1255
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
1256
        origins = k.annotate('text-3')
 
1257
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
1258
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
1259
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
1260
 
 
1261
    def test_knit_join(self):
 
1262
        """Store in knit with parents"""
 
1263
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1264
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
1265
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
1266
 
 
1267
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
1268
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
1269
 
 
1270
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
1271
 
 
1272
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1273
        count = k2.join(k1, version_ids=['text-m'])
 
1274
        self.assertEquals(count, 5)
 
1275
        self.assertTrue(k2.has_version('text-a'))
 
1276
        self.assertTrue(k2.has_version('text-c'))
 
1277
 
 
1278
    def test_reannotate(self):
 
1279
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
1280
                               factory=KnitAnnotateFactory(), create=True)
 
1281
        # 0
 
1282
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
1283
        # 1
 
1284
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
1285
 
 
1286
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
1287
                               factory=KnitAnnotateFactory(), create=True)
 
1288
        k2.join(k1, version_ids=['text-b'])
 
1289
 
 
1290
        # 2
 
1291
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
1292
        # 2
 
1293
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
1294
        # 3
 
1295
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
1296
 
 
1297
        # test-c will have index 3
 
1298
        k1.join(k2, version_ids=['text-c'])
 
1299
 
 
1300
        lines = k1.get_lines('text-c')
 
1301
        self.assertEquals(lines, ['z\n', 'c\n'])
 
1302
 
 
1303
        origins = k1.annotate('text-c')
 
1304
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
1305
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
1306
 
 
1307
    def test_get_line_delta_texts(self):
 
1308
        """Make sure we can call get_texts on text with reused line deltas"""
 
1309
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
1310
                               factory=KnitPlainFactory(), create=True)
 
1311
        for t in range(3):
 
1312
            if t == 0:
 
1313
                parents = []
 
1314
            else:
 
1315
                parents = ['%d' % (t-1)]
 
1316
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
1317
        k1.get_texts(('%d' % t) for t in range(3))
 
1318
        
 
1319
    def test_iter_lines_reads_in_order(self):
 
1320
        t = MemoryTransport()
 
1321
        instrumented_t = TransportLogger(t)
 
1322
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
1323
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
1324
        # add texts with no required ordering
 
1325
        k1.add_lines('base', [], ['text\n'])
 
1326
        k1.add_lines('base2', [], ['text2\n'])
 
1327
        k1.clear_cache()
 
1328
        instrumented_t._calls = []
 
1329
        # request a last-first iteration
 
1330
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
1331
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
1332
        self.assertEqual(['text\n', 'text2\n'], results)
 
1333
 
 
1334
    def test_create_empty_annotated(self):
 
1335
        k1 = self.make_test_knit(True)
 
1336
        # 0
 
1337
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
1338
        k2 = k1.create_empty('t', MemoryTransport())
 
1339
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
1340
        self.assertEqual(k1.delta, k2.delta)
 
1341
        # the generic test checks for empty content and file class
 
1342
 
 
1343
    def test_knit_format(self):
 
1344
        # this tests that a new knit index file has the expected content
 
1345
        # and that is writes the data we expect as records are added.
 
1346
        knit = self.make_test_knit(True)
 
1347
        # Now knit files are not created until we first add data to them
 
1348
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
1349
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1350
        self.assertFileEqual(
 
1351
            "# bzr knit index 8\n"
 
1352
            "\n"
 
1353
            "revid fulltext 0 84 .a_ghost :",
 
1354
            'test.kndx')
 
1355
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
1356
        self.assertFileEqual(
 
1357
            "# bzr knit index 8\n"
 
1358
            "\nrevid fulltext 0 84 .a_ghost :"
 
1359
            "\nrevid2 line-delta 84 82 0 :",
 
1360
            'test.kndx')
 
1361
        # we should be able to load this file again
 
1362
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1363
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1364
        # write a short write to the file and ensure that its ignored
 
1365
        indexfile = file('test.kndx', 'ab')
 
1366
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
1367
        indexfile.close()
 
1368
        # we should be able to load this file again
 
1369
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
1370
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
1371
        # and add a revision with the same id the failed write had
 
1372
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
1373
        # and when reading it revid3 should now appear.
 
1374
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1375
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
1376
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
1377
 
 
1378
    def test_delay_create(self):
 
1379
        """Test that passing delay_create=True creates files late"""
 
1380
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
1381
        self.failIfExists('test.knit')
 
1382
        self.failIfExists('test.kndx')
 
1383
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
1384
        self.failUnlessExists('test.knit')
 
1385
        self.assertFileEqual(
 
1386
            "# bzr knit index 8\n"
 
1387
            "\n"
 
1388
            "revid fulltext 0 84 .a_ghost :",
 
1389
            'test.kndx')
 
1390
 
 
1391
    def test_create_parent_dir(self):
 
1392
        """create_parent_dir can create knits in nonexistant dirs"""
 
1393
        # Has no effect if we don't set 'delay_create'
 
1394
        trans = get_transport('.')
 
1395
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
1396
                          trans, access_mode='w', factory=None,
 
1397
                          create=True, create_parent_dir=True)
 
1398
        # Nothing should have changed yet
 
1399
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1400
                                 factory=None, create=True,
 
1401
                                 create_parent_dir=True,
 
1402
                                 delay_create=True)
 
1403
        self.failIfExists('dir/test.knit')
 
1404
        self.failIfExists('dir/test.kndx')
 
1405
        self.failIfExists('dir')
 
1406
        knit.add_lines('revid', [], ['a\n'])
 
1407
        self.failUnlessExists('dir')
 
1408
        self.failUnlessExists('dir/test.knit')
 
1409
        self.assertFileEqual(
 
1410
            "# bzr knit index 8\n"
 
1411
            "\n"
 
1412
            "revid fulltext 0 84  :",
 
1413
            'dir/test.kndx')
 
1414
 
 
1415
    def test_create_mode_700(self):
 
1416
        trans = get_transport('.')
 
1417
        if not trans._can_roundtrip_unix_modebits():
 
1418
            # Can't roundtrip, so no need to run this test
 
1419
            return
 
1420
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1421
                                 factory=None, create=True,
 
1422
                                 create_parent_dir=True,
 
1423
                                 delay_create=True,
 
1424
                                 file_mode=0600,
 
1425
                                 dir_mode=0700)
 
1426
        knit.add_lines('revid', [], ['a\n'])
 
1427
        self.assertTransportMode(trans, 'dir', 0700)
 
1428
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
1429
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
1430
 
 
1431
    def test_create_mode_770(self):
 
1432
        trans = get_transport('.')
 
1433
        if not trans._can_roundtrip_unix_modebits():
 
1434
            # Can't roundtrip, so no need to run this test
 
1435
            return
 
1436
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1437
                                 factory=None, create=True,
 
1438
                                 create_parent_dir=True,
 
1439
                                 delay_create=True,
 
1440
                                 file_mode=0660,
 
1441
                                 dir_mode=0770)
 
1442
        knit.add_lines('revid', [], ['a\n'])
 
1443
        self.assertTransportMode(trans, 'dir', 0770)
 
1444
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
1445
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
1446
 
 
1447
    def test_create_mode_777(self):
 
1448
        trans = get_transport('.')
 
1449
        if not trans._can_roundtrip_unix_modebits():
 
1450
            # Can't roundtrip, so no need to run this test
 
1451
            return
 
1452
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1453
                                 factory=None, create=True,
 
1454
                                 create_parent_dir=True,
 
1455
                                 delay_create=True,
 
1456
                                 file_mode=0666,
 
1457
                                 dir_mode=0777)
 
1458
        knit.add_lines('revid', [], ['a\n'])
 
1459
        self.assertTransportMode(trans, 'dir', 0777)
 
1460
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
1461
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
1462
 
 
1463
    def test_plan_merge(self):
 
1464
        my_knit = self.make_test_knit(annotate=True)
 
1465
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
1466
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
1467
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
1468
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
1469
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
1470
            self.assertEqual(plan_line, expected_line)
 
1471
 
 
1472
    def test_get_stream_empty(self):
 
1473
        """Get a data stream for an empty knit file."""
 
1474
        k1 = self.make_test_knit()
 
1475
        format, data_list, reader_callable = k1.get_data_stream([])
 
1476
        self.assertEqual('knit-plain', format)
 
1477
        self.assertEqual([], data_list)
 
1478
        content = reader_callable(None)
 
1479
        self.assertEqual('', content)
 
1480
        self.assertIsInstance(content, str)
 
1481
 
 
1482
    def test_get_stream_one_version(self):
 
1483
        """Get a data stream for a single record out of a knit containing just
 
1484
        one record.
 
1485
        """
 
1486
        k1 = self.make_test_knit()
 
1487
        test_data = [
 
1488
            ('text-a', [], TEXT_1),
 
1489
            ]
 
1490
        expected_data_list = [
 
1491
            # version, options, length, parents
 
1492
            ('text-a', ['fulltext'], 122, []),
 
1493
           ]
 
1494
        for version_id, parents, lines in test_data:
 
1495
            k1.add_lines(version_id, parents, split_lines(lines))
 
1496
 
 
1497
        format, data_list, reader_callable = k1.get_data_stream(['text-a'])
 
1498
        self.assertEqual('knit-plain', format)
 
1499
        self.assertEqual(expected_data_list, data_list)
 
1500
        # There's only one record in the knit, so the content should be the
 
1501
        # entire knit data file's contents.
 
1502
        self.assertEqual(k1.transport.get_bytes(k1._data._access._filename),
 
1503
                         reader_callable(None))
 
1504
        
 
1505
    def test_get_stream_get_one_version_of_many(self):
 
1506
        """Get a data stream for just one version out of a knit containing many
 
1507
        versions.
 
1508
        """
 
1509
        k1 = self.make_test_knit()
 
1510
        # Insert the same data as test_knit_join, as they seem to cover a range
 
1511
        # of cases (no parents, one parent, multiple parents).
 
1512
        test_data = [
 
1513
            ('text-a', [], TEXT_1),
 
1514
            ('text-b', ['text-a'], TEXT_1),
 
1515
            ('text-c', [], TEXT_1),
 
1516
            ('text-d', ['text-c'], TEXT_1),
 
1517
            ('text-m', ['text-b', 'text-d'], TEXT_1),
 
1518
            ]
 
1519
        expected_data_list = [
 
1520
            # version, options, length, parents
 
1521
            ('text-m', ['line-delta'], 84, ['text-b', 'text-d']),
 
1522
            ]
 
1523
        for version_id, parents, lines in test_data:
 
1524
            k1.add_lines(version_id, parents, split_lines(lines))
 
1525
 
 
1526
        format, data_list, reader_callable = k1.get_data_stream(['text-m'])
 
1527
        self.assertEqual('knit-plain', format)
 
1528
        self.assertEqual(expected_data_list, data_list)
 
1529
        self.assertRecordContentEqual(k1, 'text-m', reader_callable(None))
 
1530
        
 
1531
    def test_get_stream_ghost_parent(self):
 
1532
        """Get a data stream for a version with a ghost parent."""
 
1533
        k1 = self.make_test_knit()
 
1534
        # Test data
 
1535
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
1536
        k1.add_lines_with_ghosts('text-b', ['text-a', 'text-ghost'],
 
1537
                                 split_lines(TEXT_1))
 
1538
        # Expected data
 
1539
        expected_data_list = [
 
1540
            # version, options, length, parents
 
1541
            ('text-b', ['line-delta'], 84, ['text-a', 'text-ghost']),
 
1542
            ]
 
1543
        
 
1544
        format, data_list, reader_callable = k1.get_data_stream(['text-b'])
 
1545
        self.assertEqual('knit-plain', format)
 
1546
        self.assertEqual(expected_data_list, data_list)
 
1547
        self.assertRecordContentEqual(k1, 'text-b', reader_callable(None))
 
1548
    
 
1549
    def test_get_stream_get_multiple_records(self):
 
1550
        """Get a stream for multiple records of a knit."""
 
1551
        k1 = self.make_test_knit()
 
1552
        # Insert the same data as test_knit_join, as they seem to cover a range
 
1553
        # of cases (no parents, one parent, multiple parents).
 
1554
        test_data = [
 
1555
            ('text-a', [], TEXT_1),
 
1556
            ('text-b', ['text-a'], TEXT_1),
 
1557
            ('text-c', [], TEXT_1),
 
1558
            ('text-d', ['text-c'], TEXT_1),
 
1559
            ('text-m', ['text-b', 'text-d'], TEXT_1),
 
1560
            ]
 
1561
        expected_data_list = [
 
1562
            # version, options, length, parents
 
1563
            ('text-b', ['line-delta'], 84, ['text-a']),
 
1564
            ('text-d', ['line-delta'], 84, ['text-c']),
 
1565
            ]
 
1566
        for version_id, parents, lines in test_data:
 
1567
            k1.add_lines(version_id, parents, split_lines(lines))
 
1568
 
 
1569
        # Note that even though we request the revision IDs in a particular
 
1570
        # order, the data stream may return them in any order it likes.  In this
 
1571
        # case, they'll be in the order they were inserted into the knit.
 
1572
        format, data_list, reader_callable = k1.get_data_stream(
 
1573
            ['text-d', 'text-b'])
 
1574
        self.assertEqual('knit-plain', format)
 
1575
        self.assertEqual(expected_data_list, data_list)
 
1576
        self.assertRecordContentEqual(k1, 'text-b', reader_callable(84))
 
1577
        self.assertRecordContentEqual(k1, 'text-d', reader_callable(84))
 
1578
        self.assertEqual('', reader_callable(None),
 
1579
                         "There should be no more bytes left to read.")
 
1580
 
 
1581
    def test_get_stream_all(self):
 
1582
        """Get a data stream for all the records in a knit.
 
1583
 
 
1584
        This exercises fulltext records, line-delta records, records with
 
1585
        various numbers of parents, and reading multiple records out of the
 
1586
        callable.  These cases ought to all be exercised individually by the
 
1587
        other test_get_stream_* tests; this test is basically just paranoia.
 
1588
        """
 
1589
        k1 = self.make_test_knit()
 
1590
        # Insert the same data as test_knit_join, as they seem to cover a range
 
1591
        # of cases (no parents, one parent, multiple parents).
 
1592
        test_data = [
 
1593
            ('text-a', [], TEXT_1),
 
1594
            ('text-b', ['text-a'], TEXT_1),
 
1595
            ('text-c', [], TEXT_1),
 
1596
            ('text-d', ['text-c'], TEXT_1),
 
1597
            ('text-m', ['text-b', 'text-d'], TEXT_1),
 
1598
           ]
 
1599
        expected_data_list = [
 
1600
            # version, options, length, parents
 
1601
            ('text-a', ['fulltext'], 122, []),
 
1602
            ('text-b', ['line-delta'], 84, ['text-a']),
 
1603
            ('text-c', ['fulltext'], 121, []),
 
1604
            ('text-d', ['line-delta'], 84, ['text-c']),
 
1605
            ('text-m', ['line-delta'], 84, ['text-b', 'text-d']),
 
1606
            ]
 
1607
        for version_id, parents, lines in test_data:
 
1608
            k1.add_lines(version_id, parents, split_lines(lines))
 
1609
 
 
1610
        format, data_list, reader_callable = k1.get_data_stream(
 
1611
            ['text-a', 'text-b', 'text-c', 'text-d', 'text-m'])
 
1612
        self.assertEqual('knit-plain', format)
 
1613
        self.assertEqual(expected_data_list, data_list)
 
1614
        for version_id, options, length, parents in expected_data_list:
 
1615
            bytes = reader_callable(length)
 
1616
            self.assertRecordContentEqual(k1, version_id, bytes)
 
1617
 
 
1618
    def assertKnitFilesEqual(self, knit1, knit2):
 
1619
        """Assert that the contents of the index and data files of two knits are
 
1620
        equal.
 
1621
        """
 
1622
        self.assertEqual(
 
1623
            knit1.transport.get_bytes(knit1._data._access._filename),
 
1624
            knit2.transport.get_bytes(knit2._data._access._filename))
 
1625
        self.assertEqual(
 
1626
            knit1.transport.get_bytes(knit1._index._filename),
 
1627
            knit2.transport.get_bytes(knit2._index._filename))
 
1628
 
 
1629
    def test_insert_data_stream_empty(self):
 
1630
        """Inserting a data stream with no records should not put any data into
 
1631
        the knit.
 
1632
        """
 
1633
        k1 = self.make_test_knit()
 
1634
        k1.insert_data_stream(
 
1635
            (k1.get_format_signature(), [], lambda ignored: ''))
 
1636
        self.assertEqual('', k1.transport.get_bytes(k1._data._access._filename),
 
1637
                         "The .knit should be completely empty.")
 
1638
        self.assertEqual(k1._index.HEADER,
 
1639
                         k1.transport.get_bytes(k1._index._filename),
 
1640
                         "The .kndx should have nothing apart from the header.")
 
1641
 
 
1642
    def test_insert_data_stream_one_record(self):
 
1643
        """Inserting a data stream with one record from a knit with one record
 
1644
        results in byte-identical files.
 
1645
        """
 
1646
        source = self.make_test_knit(name='source')
 
1647
        source.add_lines('text-a', [], split_lines(TEXT_1))
 
1648
        data_stream = source.get_data_stream(['text-a'])
 
1649
        
 
1650
        target = self.make_test_knit(name='target')
 
1651
        target.insert_data_stream(data_stream)
 
1652
        
 
1653
        self.assertKnitFilesEqual(source, target)
 
1654
 
 
1655
    def test_insert_data_stream_records_already_present(self):
 
1656
        """Insert a data stream where some records are alreday present in the
 
1657
        target, and some not.  Only the new records are inserted.
 
1658
        """
 
1659
        source = self.make_test_knit(name='source')
 
1660
        target = self.make_test_knit(name='target')
 
1661
        # Insert 'text-a' into both source and target
 
1662
        source.add_lines('text-a', [], split_lines(TEXT_1))
 
1663
        target.insert_data_stream(source.get_data_stream(['text-a']))
 
1664
        # Insert 'text-b' into just the source.
 
1665
        source.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
1666
        # Get a data stream of both text-a and text-b, and insert it.
 
1667
        data_stream = source.get_data_stream(['text-a', 'text-b'])
 
1668
        target.insert_data_stream(data_stream)
 
1669
        # The source and target will now be identical.  This means the text-a
 
1670
        # record was not added a second time.
 
1671
        self.assertKnitFilesEqual(source, target)
 
1672
 
 
1673
    def test_insert_data_stream_multiple_records(self):
 
1674
        """Inserting a data stream of all records from a knit with multiple
 
1675
        records results in byte-identical files.
 
1676
        """
 
1677
        source = self.make_test_knit(name='source')
 
1678
        source.add_lines('text-a', [], split_lines(TEXT_1))
 
1679
        source.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
1680
        source.add_lines('text-c', [], split_lines(TEXT_1))
 
1681
        data_stream = source.get_data_stream(['text-a', 'text-b', 'text-c'])
 
1682
        
 
1683
        target = self.make_test_knit(name='target')
 
1684
        target.insert_data_stream(data_stream)
 
1685
        
 
1686
        self.assertKnitFilesEqual(source, target)
 
1687
 
 
1688
    def test_insert_data_stream_ghost_parent(self):
 
1689
        """Insert a data stream with a record that has a ghost parent."""
 
1690
        # Make a knit with a record, text-a, that has a ghost parent.
 
1691
        source = self.make_test_knit(name='source')
 
1692
        source.add_lines_with_ghosts('text-a', ['text-ghost'],
 
1693
                                     split_lines(TEXT_1))
 
1694
        data_stream = source.get_data_stream(['text-a'])
 
1695
 
 
1696
        target = self.make_test_knit(name='target')
 
1697
        target.insert_data_stream(data_stream)
 
1698
 
 
1699
        self.assertKnitFilesEqual(source, target)
 
1700
 
 
1701
        # The target knit object is in a consistent state, i.e. the record we
 
1702
        # just added is immediately visible.
 
1703
        self.assertTrue(target.has_version('text-a'))
 
1704
        self.assertTrue(target.has_ghost('text-ghost'))
 
1705
        self.assertEqual(split_lines(TEXT_1), target.get_lines('text-a'))
 
1706
 
 
1707
    def test_insert_data_stream_inconsistent_version_lines(self):
 
1708
        """Inserting a data stream which has different content for a version_id
 
1709
        than already exists in the knit will raise KnitCorrupt.
 
1710
        """
 
1711
        source = self.make_test_knit(name='source')
 
1712
        target = self.make_test_knit(name='target')
 
1713
        # Insert a different 'text-a' into both source and target
 
1714
        source.add_lines('text-a', [], split_lines(TEXT_1))
 
1715
        target.add_lines('text-a', [], split_lines(TEXT_2))
 
1716
        # Insert a data stream with conflicting content into the target
 
1717
        data_stream = source.get_data_stream(['text-a'])
 
1718
        self.assertRaises(
 
1719
            errors.KnitCorrupt, target.insert_data_stream, data_stream)
 
1720
 
 
1721
    def test_insert_data_stream_inconsistent_version_parents(self):
 
1722
        """Inserting a data stream which has different parents for a version_id
 
1723
        than already exists in the knit will raise KnitCorrupt.
 
1724
        """
 
1725
        source = self.make_test_knit(name='source')
 
1726
        target = self.make_test_knit(name='target')
 
1727
        # Insert a different 'text-a' into both source and target.  They differ
 
1728
        # only by the parents list, the content is the same.
 
1729
        source.add_lines_with_ghosts('text-a', [], split_lines(TEXT_1))
 
1730
        target.add_lines_with_ghosts('text-a', ['a-ghost'], split_lines(TEXT_1))
 
1731
        # Insert a data stream with conflicting content into the target
 
1732
        data_stream = source.get_data_stream(['text-a'])
 
1733
        self.assertRaises(
 
1734
            errors.KnitCorrupt, target.insert_data_stream, data_stream)
 
1735
 
 
1736
    def test_insert_data_stream_incompatible_format(self):
 
1737
        """A data stream in a different format to the target knit cannot be
 
1738
        inserted.
 
1739
 
 
1740
        It will raise KnitDataStreamIncompatible.
 
1741
        """
 
1742
        data_stream = ('fake-format-signature', [], lambda _: '')
 
1743
        target = self.make_test_knit(name='target')
 
1744
        self.assertRaises(
 
1745
            errors.KnitDataStreamIncompatible,
 
1746
            target.insert_data_stream, data_stream)
 
1747
 
 
1748
    #  * test that a stream of "already present version, then new version"
 
1749
    #    inserts correctly.
 
1750
 
 
1751
TEXT_1 = """\
 
1752
Banana cup cakes:
 
1753
 
 
1754
- bananas
 
1755
- eggs
 
1756
- broken tea cups
 
1757
"""
 
1758
 
 
1759
TEXT_1A = """\
 
1760
Banana cup cake recipe
 
1761
(serves 6)
 
1762
 
 
1763
- bananas
 
1764
- eggs
 
1765
- broken tea cups
 
1766
- self-raising flour
 
1767
"""
 
1768
 
 
1769
TEXT_1B = """\
 
1770
Banana cup cake recipe
 
1771
 
 
1772
- bananas (do not use plantains!!!)
 
1773
- broken tea cups
 
1774
- flour
 
1775
"""
 
1776
 
 
1777
delta_1_1a = """\
 
1778
0,1,2
 
1779
Banana cup cake recipe
 
1780
(serves 6)
 
1781
5,5,1
 
1782
- self-raising flour
 
1783
"""
 
1784
 
 
1785
TEXT_2 = """\
 
1786
Boeuf bourguignon
 
1787
 
 
1788
- beef
 
1789
- red wine
 
1790
- small onions
 
1791
- carrot
 
1792
- mushrooms
 
1793
"""
 
1794
 
 
1795
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
1796
new-a|(serves 6)
 
1797
unchanged|
 
1798
killed-b|- bananas
 
1799
killed-b|- eggs
 
1800
new-b|- bananas (do not use plantains!!!)
 
1801
unchanged|- broken tea cups
 
1802
new-a|- self-raising flour
 
1803
new-b|- flour
 
1804
"""
 
1805
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
1806
 
 
1807
 
 
1808
def line_delta(from_lines, to_lines):
 
1809
    """Generate line-based delta from one text to another"""
 
1810
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
1811
    for op in s.get_opcodes():
 
1812
        if op[0] == 'equal':
 
1813
            continue
 
1814
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
1815
        for i in range(op[3], op[4]):
 
1816
            yield to_lines[i]
 
1817
 
 
1818
 
 
1819
def apply_line_delta(basis_lines, delta_lines):
 
1820
    """Apply a line-based perfect diff
 
1821
    
 
1822
    basis_lines -- text to apply the patch to
 
1823
    delta_lines -- diff instructions and content
 
1824
    """
 
1825
    out = basis_lines[:]
 
1826
    i = 0
 
1827
    offset = 0
 
1828
    while i < len(delta_lines):
 
1829
        l = delta_lines[i]
 
1830
        a, b, c = map(long, l.split(','))
 
1831
        i = i + 1
 
1832
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
1833
        i = i + c
 
1834
        offset = offset + (b - a) + c
 
1835
    return out
 
1836
 
 
1837
 
 
1838
class TestWeaveToKnit(KnitTests):
 
1839
 
 
1840
    def test_weave_to_knit_matches(self):
 
1841
        # check that the WeaveToKnit is_compatible function
 
1842
        # registers True for a Weave to a Knit.
 
1843
        w = Weave()
 
1844
        k = self.make_test_knit()
 
1845
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
1846
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
1847
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
1848
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
1849
 
 
1850
 
 
1851
class TestKnitCaching(KnitTests):
 
1852
    
 
1853
    def create_knit(self, cache_add=False):
 
1854
        k = self.make_test_knit(True)
 
1855
        if cache_add:
 
1856
            k.enable_cache()
 
1857
 
 
1858
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
1859
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
1860
        return k
 
1861
 
 
1862
    def test_no_caching(self):
 
1863
        k = self.create_knit()
 
1864
        # Nothing should be cached without setting 'enable_cache'
 
1865
        self.assertEqual({}, k._data._cache)
 
1866
 
 
1867
    def test_cache_add_and_clear(self):
 
1868
        k = self.create_knit(True)
 
1869
 
 
1870
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
1871
 
 
1872
        k.clear_cache()
 
1873
        self.assertEqual({}, k._data._cache)
 
1874
 
 
1875
    def test_cache_data_read_raw(self):
 
1876
        k = self.create_knit()
 
1877
 
 
1878
        # Now cache and read
 
1879
        k.enable_cache()
 
1880
 
 
1881
        def read_one_raw(version):
 
1882
            pos_map = k._get_components_positions([version])
 
1883
            method, index_memo, next = pos_map[version]
 
1884
            lst = list(k._data.read_records_iter_raw([(version, index_memo)]))
 
1885
            self.assertEqual(1, len(lst))
 
1886
            return lst[0]
 
1887
 
 
1888
        val = read_one_raw('text-1')
 
1889
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
1890
 
 
1891
        k.clear_cache()
 
1892
        # After clear, new reads are not cached
 
1893
        self.assertEqual({}, k._data._cache)
 
1894
 
 
1895
        val2 = read_one_raw('text-1')
 
1896
        self.assertEqual(val, val2)
 
1897
        self.assertEqual({}, k._data._cache)
 
1898
 
 
1899
    def test_cache_data_read(self):
 
1900
        k = self.create_knit()
 
1901
 
 
1902
        def read_one(version):
 
1903
            pos_map = k._get_components_positions([version])
 
1904
            method, index_memo, next = pos_map[version]
 
1905
            lst = list(k._data.read_records_iter([(version, index_memo)]))
 
1906
            self.assertEqual(1, len(lst))
 
1907
            return lst[0]
 
1908
 
 
1909
        # Now cache and read
 
1910
        k.enable_cache()
 
1911
 
 
1912
        val = read_one('text-2')
 
1913
        self.assertEqual(['text-2'], k._data._cache.keys())
 
1914
        self.assertEqual('text-2', val[0])
 
1915
        content, digest = k._data._parse_record('text-2',
 
1916
                                                k._data._cache['text-2'])
 
1917
        self.assertEqual(content, val[1])
 
1918
        self.assertEqual(digest, val[2])
 
1919
 
 
1920
        k.clear_cache()
 
1921
        self.assertEqual({}, k._data._cache)
 
1922
 
 
1923
        val2 = read_one('text-2')
 
1924
        self.assertEqual(val, val2)
 
1925
        self.assertEqual({}, k._data._cache)
 
1926
 
 
1927
    def test_cache_read(self):
 
1928
        k = self.create_knit()
 
1929
        k.enable_cache()
 
1930
 
 
1931
        text = k.get_text('text-1')
 
1932
        self.assertEqual(TEXT_1, text)
 
1933
        self.assertEqual(['text-1'], k._data._cache.keys())
 
1934
 
 
1935
        k.clear_cache()
 
1936
        self.assertEqual({}, k._data._cache)
 
1937
 
 
1938
        text = k.get_text('text-1')
 
1939
        self.assertEqual(TEXT_1, text)
 
1940
        self.assertEqual({}, k._data._cache)
 
1941
 
 
1942
 
 
1943
class TestKnitIndex(KnitTests):
 
1944
 
 
1945
    def test_add_versions_dictionary_compresses(self):
 
1946
        """Adding versions to the index should update the lookup dict"""
 
1947
        knit = self.make_test_knit()
 
1948
        idx = knit._index
 
1949
        idx.add_version('a-1', ['fulltext'], (None, 0, 0), [])
 
1950
        self.check_file_contents('test.kndx',
 
1951
            '# bzr knit index 8\n'
 
1952
            '\n'
 
1953
            'a-1 fulltext 0 0  :'
 
1954
            )
 
1955
        idx.add_versions([('a-2', ['fulltext'], (None, 0, 0), ['a-1']),
 
1956
                          ('a-3', ['fulltext'], (None, 0, 0), ['a-2']),
 
1957
                         ])
 
1958
        self.check_file_contents('test.kndx',
 
1959
            '# bzr knit index 8\n'
 
1960
            '\n'
 
1961
            'a-1 fulltext 0 0  :\n'
 
1962
            'a-2 fulltext 0 0 0 :\n'
 
1963
            'a-3 fulltext 0 0 1 :'
 
1964
            )
 
1965
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
1966
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
1967
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
1968
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
1969
                         }, idx._cache)
 
1970
 
 
1971
    def test_add_versions_fails_clean(self):
 
1972
        """If add_versions fails in the middle, it restores a pristine state.
 
1973
 
 
1974
        Any modifications that are made to the index are reset if all versions
 
1975
        cannot be added.
 
1976
        """
 
1977
        # This cheats a little bit by passing in a generator which will
 
1978
        # raise an exception before the processing finishes
 
1979
        # Other possibilities would be to have an version with the wrong number
 
1980
        # of entries, or to make the backing transport unable to write any
 
1981
        # files.
 
1982
 
 
1983
        knit = self.make_test_knit()
 
1984
        idx = knit._index
 
1985
        idx.add_version('a-1', ['fulltext'], (None, 0, 0), [])
 
1986
 
 
1987
        class StopEarly(Exception):
 
1988
            pass
 
1989
 
 
1990
        def generate_failure():
 
1991
            """Add some entries and then raise an exception"""
 
1992
            yield ('a-2', ['fulltext'], (None, 0, 0), ['a-1'])
 
1993
            yield ('a-3', ['fulltext'], (None, 0, 0), ['a-2'])
 
1994
            raise StopEarly()
 
1995
 
 
1996
        # Assert the pre-condition
 
1997
        self.assertEqual(['a-1'], idx._history)
 
1998
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
1999
 
 
2000
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
2001
 
 
2002
        # And it shouldn't be modified
 
2003
        self.assertEqual(['a-1'], idx._history)
 
2004
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
2005
 
 
2006
    def test_knit_index_ignores_empty_files(self):
 
2007
        # There was a race condition in older bzr, where a ^C at the right time
 
2008
        # could leave an empty .kndx file, which bzr would later claim was a
 
2009
        # corrupted file since the header was not present. In reality, the file
 
2010
        # just wasn't created, so it should be ignored.
 
2011
        t = get_transport('.')
 
2012
        t.put_bytes('test.kndx', '')
 
2013
 
 
2014
        knit = self.make_test_knit()
 
2015
 
 
2016
    def test_knit_index_checks_header(self):
 
2017
        t = get_transport('.')
 
2018
        t.put_bytes('test.kndx', '# not really a knit header\n\n')
 
2019
 
 
2020
        self.assertRaises(KnitHeaderError, self.make_test_knit)
 
2021
 
 
2022
 
 
2023
class TestGraphIndexKnit(KnitTests):
 
2024
    """Tests for knits using a GraphIndex rather than a KnitIndex."""
 
2025
 
 
2026
    def make_g_index(self, name, ref_lists=0, nodes=[]):
 
2027
        builder = GraphIndexBuilder(ref_lists)
 
2028
        for node, references, value in nodes:
 
2029
            builder.add_node(node, references, value)
 
2030
        stream = builder.finish()
 
2031
        trans = self.get_transport()
 
2032
        trans.put_file(name, stream)
 
2033
        return GraphIndex(trans, name)
 
2034
 
 
2035
    def two_graph_index(self, deltas=False, catch_adds=False):
 
2036
        """Build a two-graph index.
 
2037
 
 
2038
        :param deltas: If true, use underlying indices with two node-ref
 
2039
            lists and 'parent' set to a delta-compressed against tail.
 
2040
        """
 
2041
        # build a complex graph across several indices.
 
2042
        if deltas:
 
2043
            # delta compression inn the index
 
2044
            index1 = self.make_g_index('1', 2, [
 
2045
                (('tip', ), 'N0 100', ([('parent', )], [], )),
 
2046
                (('tail', ), '', ([], []))])
 
2047
            index2 = self.make_g_index('2', 2, [
 
2048
                (('parent', ), ' 100 78', ([('tail', ), ('ghost', )], [('tail', )])),
 
2049
                (('separate', ), '', ([], []))])
 
2050
        else:
 
2051
            # just blob location and graph in the index.
 
2052
            index1 = self.make_g_index('1', 1, [
 
2053
                (('tip', ), 'N0 100', ([('parent', )], )),
 
2054
                (('tail', ), '', ([], ))])
 
2055
            index2 = self.make_g_index('2', 1, [
 
2056
                (('parent', ), ' 100 78', ([('tail', ), ('ghost', )], )),
 
2057
                (('separate', ), '', ([], ))])
 
2058
        combined_index = CombinedGraphIndex([index1, index2])
 
2059
        if catch_adds:
 
2060
            self.combined_index = combined_index
 
2061
            self.caught_entries = []
 
2062
            add_callback = self.catch_add
 
2063
        else:
 
2064
            add_callback = None
 
2065
        return KnitGraphIndex(combined_index, deltas=deltas,
 
2066
            add_callback=add_callback)
 
2067
 
 
2068
    def test_get_graph(self):
 
2069
        index = self.two_graph_index()
 
2070
        self.assertEqual(set([
 
2071
            ('tip', ('parent', )),
 
2072
            ('tail', ()),
 
2073
            ('parent', ('tail', 'ghost')),
 
2074
            ('separate', ()),
 
2075
            ]), set(index.get_graph()))
 
2076
 
 
2077
    def test_get_ancestry(self):
 
2078
        # get_ancestry is defined as eliding ghosts, not erroring.
 
2079
        index = self.two_graph_index()
 
2080
        self.assertEqual([], index.get_ancestry([]))
 
2081
        self.assertEqual(['separate'], index.get_ancestry(['separate']))
 
2082
        self.assertEqual(['tail'], index.get_ancestry(['tail']))
 
2083
        self.assertEqual(['tail', 'parent'], index.get_ancestry(['parent']))
 
2084
        self.assertEqual(['tail', 'parent', 'tip'], index.get_ancestry(['tip']))
 
2085
        self.assertTrue(index.get_ancestry(['tip', 'separate']) in
 
2086
            (['tail', 'parent', 'tip', 'separate'],
 
2087
             ['separate', 'tail', 'parent', 'tip'],
 
2088
            ))
 
2089
        # and without topo_sort
 
2090
        self.assertEqual(set(['separate']),
 
2091
            set(index.get_ancestry(['separate'], topo_sorted=False)))
 
2092
        self.assertEqual(set(['tail']),
 
2093
            set(index.get_ancestry(['tail'], topo_sorted=False)))
 
2094
        self.assertEqual(set(['tail', 'parent']),
 
2095
            set(index.get_ancestry(['parent'], topo_sorted=False)))
 
2096
        self.assertEqual(set(['tail', 'parent', 'tip']),
 
2097
            set(index.get_ancestry(['tip'], topo_sorted=False)))
 
2098
        self.assertEqual(set(['separate', 'tail', 'parent', 'tip']),
 
2099
            set(index.get_ancestry(['tip', 'separate'])))
 
2100
        # asking for a ghost makes it go boom.
 
2101
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
 
2102
 
 
2103
    def test_get_ancestry_with_ghosts(self):
 
2104
        index = self.two_graph_index()
 
2105
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
2106
        self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
 
2107
        self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
 
2108
        self.assertTrue(index.get_ancestry_with_ghosts(['parent']) in
 
2109
            (['tail', 'ghost', 'parent'],
 
2110
             ['ghost', 'tail', 'parent'],
 
2111
            ))
 
2112
        self.assertTrue(index.get_ancestry_with_ghosts(['tip']) in
 
2113
            (['tail', 'ghost', 'parent', 'tip'],
 
2114
             ['ghost', 'tail', 'parent', 'tip'],
 
2115
            ))
 
2116
        self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
 
2117
            (['tail', 'ghost', 'parent', 'tip', 'separate'],
 
2118
             ['ghost', 'tail', 'parent', 'tip', 'separate'],
 
2119
             ['separate', 'tail', 'ghost', 'parent', 'tip'],
 
2120
             ['separate', 'ghost', 'tail', 'parent', 'tip'],
 
2121
            ))
 
2122
        # asking for a ghost makes it go boom.
 
2123
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
 
2124
 
 
2125
    def test_num_versions(self):
 
2126
        index = self.two_graph_index()
 
2127
        self.assertEqual(4, index.num_versions())
 
2128
 
 
2129
    def test_get_versions(self):
 
2130
        index = self.two_graph_index()
 
2131
        self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
 
2132
            set(index.get_versions()))
 
2133
 
 
2134
    def test_has_version(self):
 
2135
        index = self.two_graph_index()
 
2136
        self.assertTrue(index.has_version('tail'))
 
2137
        self.assertFalse(index.has_version('ghost'))
 
2138
 
 
2139
    def test_get_position(self):
 
2140
        index = self.two_graph_index()
 
2141
        self.assertEqual((index._graph_index._indices[0], 0, 100), index.get_position('tip'))
 
2142
        self.assertEqual((index._graph_index._indices[1], 100, 78), index.get_position('parent'))
 
2143
 
 
2144
    def test_get_method_deltas(self):
 
2145
        index = self.two_graph_index(deltas=True)
 
2146
        self.assertEqual('fulltext', index.get_method('tip'))
 
2147
        self.assertEqual('line-delta', index.get_method('parent'))
 
2148
 
 
2149
    def test_get_method_no_deltas(self):
 
2150
        # check that the parent-history lookup is ignored with deltas=False.
 
2151
        index = self.two_graph_index(deltas=False)
 
2152
        self.assertEqual('fulltext', index.get_method('tip'))
 
2153
        self.assertEqual('fulltext', index.get_method('parent'))
 
2154
 
 
2155
    def test_get_options_deltas(self):
 
2156
        index = self.two_graph_index(deltas=True)
 
2157
        self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
 
2158
        self.assertEqual(['line-delta'], index.get_options('parent'))
 
2159
 
 
2160
    def test_get_options_no_deltas(self):
 
2161
        # check that the parent-history lookup is ignored with deltas=False.
 
2162
        index = self.two_graph_index(deltas=False)
 
2163
        self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
 
2164
        self.assertEqual(['fulltext'], index.get_options('parent'))
 
2165
 
 
2166
    def test_get_parents(self):
 
2167
        # get_parents ignores ghosts
 
2168
        index = self.two_graph_index()
 
2169
        self.assertEqual(('tail', ), index.get_parents('parent'))
 
2170
        # and errors on ghosts.
 
2171
        self.assertRaises(errors.RevisionNotPresent,
 
2172
            index.get_parents, 'ghost')
 
2173
 
 
2174
    def test_get_parents_with_ghosts(self):
 
2175
        index = self.two_graph_index()
 
2176
        self.assertEqual(('tail', 'ghost'), index.get_parents_with_ghosts('parent'))
 
2177
        # and errors on ghosts.
 
2178
        self.assertRaises(errors.RevisionNotPresent,
 
2179
            index.get_parents_with_ghosts, 'ghost')
 
2180
 
 
2181
    def test_check_versions_present(self):
 
2182
        # ghosts should not be considered present
 
2183
        index = self.two_graph_index()
 
2184
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
2185
            ['ghost'])
 
2186
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
2187
            ['tail', 'ghost'])
 
2188
        index.check_versions_present(['tail', 'separate'])
 
2189
 
 
2190
    def catch_add(self, entries):
 
2191
        self.caught_entries.append(entries)
 
2192
 
 
2193
    def test_add_no_callback_errors(self):
 
2194
        index = self.two_graph_index()
 
2195
        self.assertRaises(errors.ReadOnlyError, index.add_version,
 
2196
            'new', 'fulltext,no-eol', (None, 50, 60), ['separate'])
 
2197
 
 
2198
    def test_add_version_smoke(self):
 
2199
        index = self.two_graph_index(catch_adds=True)
 
2200
        index.add_version('new', 'fulltext,no-eol', (None, 50, 60), ['separate'])
 
2201
        self.assertEqual([[(('new', ), 'N50 60', ((('separate',),),))]],
 
2202
            self.caught_entries)
 
2203
 
 
2204
    def test_add_version_delta_not_delta_index(self):
 
2205
        index = self.two_graph_index(catch_adds=True)
 
2206
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2207
            'new', 'no-eol,line-delta', (None, 0, 100), ['parent'])
 
2208
        self.assertEqual([], self.caught_entries)
 
2209
 
 
2210
    def test_add_version_same_dup(self):
 
2211
        index = self.two_graph_index(catch_adds=True)
 
2212
        # options can be spelt two different ways
 
2213
        index.add_version('tip', 'fulltext,no-eol', (None, 0, 100), ['parent'])
 
2214
        index.add_version('tip', 'no-eol,fulltext', (None, 0, 100), ['parent'])
 
2215
        # but neither should have added data.
 
2216
        self.assertEqual([[], []], self.caught_entries)
 
2217
        
 
2218
    def test_add_version_different_dup(self):
 
2219
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
2220
        # change options
 
2221
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2222
            'tip', 'no-eol,line-delta', (None, 0, 100), ['parent'])
 
2223
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2224
            'tip', 'line-delta,no-eol', (None, 0, 100), ['parent'])
 
2225
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2226
            'tip', 'fulltext', (None, 0, 100), ['parent'])
 
2227
        # position/length
 
2228
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2229
            'tip', 'fulltext,no-eol', (None, 50, 100), ['parent'])
 
2230
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2231
            'tip', 'fulltext,no-eol', (None, 0, 1000), ['parent'])
 
2232
        # parents
 
2233
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2234
            'tip', 'fulltext,no-eol', (None, 0, 100), [])
 
2235
        self.assertEqual([], self.caught_entries)
 
2236
        
 
2237
    def test_add_versions_nodeltas(self):
 
2238
        index = self.two_graph_index(catch_adds=True)
 
2239
        index.add_versions([
 
2240
                ('new', 'fulltext,no-eol', (None, 50, 60), ['separate']),
 
2241
                ('new2', 'fulltext', (None, 0, 6), ['new']),
 
2242
                ])
 
2243
        self.assertEqual([(('new', ), 'N50 60', ((('separate',),),)),
 
2244
            (('new2', ), ' 0 6', ((('new',),),))],
 
2245
            sorted(self.caught_entries[0]))
 
2246
        self.assertEqual(1, len(self.caught_entries))
 
2247
 
 
2248
    def test_add_versions_deltas(self):
 
2249
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
2250
        index.add_versions([
 
2251
                ('new', 'fulltext,no-eol', (None, 50, 60), ['separate']),
 
2252
                ('new2', 'line-delta', (None, 0, 6), ['new']),
 
2253
                ])
 
2254
        self.assertEqual([(('new', ), 'N50 60', ((('separate',),), ())),
 
2255
            (('new2', ), ' 0 6', ((('new',),), (('new',),), ))],
 
2256
            sorted(self.caught_entries[0]))
 
2257
        self.assertEqual(1, len(self.caught_entries))
 
2258
 
 
2259
    def test_add_versions_delta_not_delta_index(self):
 
2260
        index = self.two_graph_index(catch_adds=True)
 
2261
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2262
            [('new', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
 
2263
        self.assertEqual([], self.caught_entries)
 
2264
 
 
2265
    def test_add_versions_same_dup(self):
 
2266
        index = self.two_graph_index(catch_adds=True)
 
2267
        # options can be spelt two different ways
 
2268
        index.add_versions([('tip', 'fulltext,no-eol', (None, 0, 100), ['parent'])])
 
2269
        index.add_versions([('tip', 'no-eol,fulltext', (None, 0, 100), ['parent'])])
 
2270
        # but neither should have added data.
 
2271
        self.assertEqual([[], []], self.caught_entries)
 
2272
        
 
2273
    def test_add_versions_different_dup(self):
 
2274
        index = self.two_graph_index(deltas=True, catch_adds=True)
 
2275
        # change options
 
2276
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2277
            [('tip', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
 
2278
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2279
            [('tip', 'line-delta,no-eol', (None, 0, 100), ['parent'])])
 
2280
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2281
            [('tip', 'fulltext', (None, 0, 100), ['parent'])])
 
2282
        # position/length
 
2283
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2284
            [('tip', 'fulltext,no-eol', (None, 50, 100), ['parent'])])
 
2285
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2286
            [('tip', 'fulltext,no-eol', (None, 0, 1000), ['parent'])])
 
2287
        # parents
 
2288
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2289
            [('tip', 'fulltext,no-eol', (None, 0, 100), [])])
 
2290
        # change options in the second record
 
2291
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2292
            [('tip', 'fulltext,no-eol', (None, 0, 100), ['parent']),
 
2293
             ('tip', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
 
2294
        self.assertEqual([], self.caught_entries)
 
2295
 
 
2296
    def test_iter_parents(self):
 
2297
        index1 = self.make_g_index('1', 1, [
 
2298
        # no parents
 
2299
            (('r0', ), 'N0 100', ([], )),
 
2300
        # 1 parent
 
2301
            (('r1', ), '', ([('r0', )], ))])
 
2302
        index2 = self.make_g_index('2', 1, [
 
2303
        # 2 parents
 
2304
            (('r2', ), 'N0 100', ([('r1', ), ('r0', )], )),
 
2305
            ])
 
2306
        combined_index = CombinedGraphIndex([index1, index2])
 
2307
        index = KnitGraphIndex(combined_index)
 
2308
        # XXX TODO a ghost
 
2309
        # cases: each sample data individually:
 
2310
        self.assertEqual(set([('r0', ())]),
 
2311
            set(index.iter_parents(['r0'])))
 
2312
        self.assertEqual(set([('r1', ('r0', ))]),
 
2313
            set(index.iter_parents(['r1'])))
 
2314
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
 
2315
            set(index.iter_parents(['r2'])))
 
2316
        # no nodes returned for a missing node
 
2317
        self.assertEqual(set(),
 
2318
            set(index.iter_parents(['missing'])))
 
2319
        # 1 node returned with missing nodes skipped
 
2320
        self.assertEqual(set([('r1', ('r0', ))]),
 
2321
            set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
 
2322
        # 2 nodes returned
 
2323
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
2324
            set(index.iter_parents(['r0', 'r1'])))
 
2325
        # 2 nodes returned, missing skipped
 
2326
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
2327
            set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
 
2328
 
 
2329
 
 
2330
class TestNoParentsGraphIndexKnit(KnitTests):
 
2331
    """Tests for knits using KnitGraphIndex with no parents."""
 
2332
 
 
2333
    def make_g_index(self, name, ref_lists=0, nodes=[]):
 
2334
        builder = GraphIndexBuilder(ref_lists)
 
2335
        for node, references in nodes:
 
2336
            builder.add_node(node, references)
 
2337
        stream = builder.finish()
 
2338
        trans = self.get_transport()
 
2339
        trans.put_file(name, stream)
 
2340
        return GraphIndex(trans, name)
 
2341
 
 
2342
    def test_parents_deltas_incompatible(self):
 
2343
        index = CombinedGraphIndex([])
 
2344
        self.assertRaises(errors.KnitError, KnitGraphIndex, index,
 
2345
            deltas=True, parents=False)
 
2346
 
 
2347
    def two_graph_index(self, catch_adds=False):
 
2348
        """Build a two-graph index.
 
2349
 
 
2350
        :param deltas: If true, use underlying indices with two node-ref
 
2351
            lists and 'parent' set to a delta-compressed against tail.
 
2352
        """
 
2353
        # put several versions in the index.
 
2354
        index1 = self.make_g_index('1', 0, [
 
2355
            (('tip', ), 'N0 100'),
 
2356
            (('tail', ), '')])
 
2357
        index2 = self.make_g_index('2', 0, [
 
2358
            (('parent', ), ' 100 78'),
 
2359
            (('separate', ), '')])
 
2360
        combined_index = CombinedGraphIndex([index1, index2])
 
2361
        if catch_adds:
 
2362
            self.combined_index = combined_index
 
2363
            self.caught_entries = []
 
2364
            add_callback = self.catch_add
 
2365
        else:
 
2366
            add_callback = None
 
2367
        return KnitGraphIndex(combined_index, parents=False,
 
2368
            add_callback=add_callback)
 
2369
 
 
2370
    def test_get_graph(self):
 
2371
        index = self.two_graph_index()
 
2372
        self.assertEqual(set([
 
2373
            ('tip', ()),
 
2374
            ('tail', ()),
 
2375
            ('parent', ()),
 
2376
            ('separate', ()),
 
2377
            ]), set(index.get_graph()))
 
2378
 
 
2379
    def test_get_ancestry(self):
 
2380
        # with no parents, ancestry is always just the key.
 
2381
        index = self.two_graph_index()
 
2382
        self.assertEqual([], index.get_ancestry([]))
 
2383
        self.assertEqual(['separate'], index.get_ancestry(['separate']))
 
2384
        self.assertEqual(['tail'], index.get_ancestry(['tail']))
 
2385
        self.assertEqual(['parent'], index.get_ancestry(['parent']))
 
2386
        self.assertEqual(['tip'], index.get_ancestry(['tip']))
 
2387
        self.assertTrue(index.get_ancestry(['tip', 'separate']) in
 
2388
            (['tip', 'separate'],
 
2389
             ['separate', 'tip'],
 
2390
            ))
 
2391
        # asking for a ghost makes it go boom.
 
2392
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry, ['ghost'])
 
2393
 
 
2394
    def test_get_ancestry_with_ghosts(self):
 
2395
        index = self.two_graph_index()
 
2396
        self.assertEqual([], index.get_ancestry_with_ghosts([]))
 
2397
        self.assertEqual(['separate'], index.get_ancestry_with_ghosts(['separate']))
 
2398
        self.assertEqual(['tail'], index.get_ancestry_with_ghosts(['tail']))
 
2399
        self.assertEqual(['parent'], index.get_ancestry_with_ghosts(['parent']))
 
2400
        self.assertEqual(['tip'], index.get_ancestry_with_ghosts(['tip']))
 
2401
        self.assertTrue(index.get_ancestry_with_ghosts(['tip', 'separate']) in
 
2402
            (['tip', 'separate'],
 
2403
             ['separate', 'tip'],
 
2404
            ))
 
2405
        # asking for a ghost makes it go boom.
 
2406
        self.assertRaises(errors.RevisionNotPresent, index.get_ancestry_with_ghosts, ['ghost'])
 
2407
 
 
2408
    def test_num_versions(self):
 
2409
        index = self.two_graph_index()
 
2410
        self.assertEqual(4, index.num_versions())
 
2411
 
 
2412
    def test_get_versions(self):
 
2413
        index = self.two_graph_index()
 
2414
        self.assertEqual(set(['tail', 'tip', 'parent', 'separate']),
 
2415
            set(index.get_versions()))
 
2416
 
 
2417
    def test_has_version(self):
 
2418
        index = self.two_graph_index()
 
2419
        self.assertTrue(index.has_version('tail'))
 
2420
        self.assertFalse(index.has_version('ghost'))
 
2421
 
 
2422
    def test_get_position(self):
 
2423
        index = self.two_graph_index()
 
2424
        self.assertEqual((index._graph_index._indices[0], 0, 100), index.get_position('tip'))
 
2425
        self.assertEqual((index._graph_index._indices[1], 100, 78), index.get_position('parent'))
 
2426
 
 
2427
    def test_get_method(self):
 
2428
        index = self.two_graph_index()
 
2429
        self.assertEqual('fulltext', index.get_method('tip'))
 
2430
        self.assertEqual(['fulltext'], index.get_options('parent'))
 
2431
 
 
2432
    def test_get_options(self):
 
2433
        index = self.two_graph_index()
 
2434
        self.assertEqual(['fulltext', 'no-eol'], index.get_options('tip'))
 
2435
        self.assertEqual(['fulltext'], index.get_options('parent'))
 
2436
 
 
2437
    def test_get_parents(self):
 
2438
        index = self.two_graph_index()
 
2439
        self.assertEqual((), index.get_parents('parent'))
 
2440
        # and errors on ghosts.
 
2441
        self.assertRaises(errors.RevisionNotPresent,
 
2442
            index.get_parents, 'ghost')
 
2443
 
 
2444
    def test_get_parents_with_ghosts(self):
 
2445
        index = self.two_graph_index()
 
2446
        self.assertEqual((), index.get_parents_with_ghosts('parent'))
 
2447
        # and errors on ghosts.
 
2448
        self.assertRaises(errors.RevisionNotPresent,
 
2449
            index.get_parents_with_ghosts, 'ghost')
 
2450
 
 
2451
    def test_check_versions_present(self):
 
2452
        index = self.two_graph_index()
 
2453
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
2454
            ['missing'])
 
2455
        self.assertRaises(RevisionNotPresent, index.check_versions_present,
 
2456
            ['tail', 'missing'])
 
2457
        index.check_versions_present(['tail', 'separate'])
 
2458
 
 
2459
    def catch_add(self, entries):
 
2460
        self.caught_entries.append(entries)
 
2461
 
 
2462
    def test_add_no_callback_errors(self):
 
2463
        index = self.two_graph_index()
 
2464
        self.assertRaises(errors.ReadOnlyError, index.add_version,
 
2465
            'new', 'fulltext,no-eol', (None, 50, 60), ['separate'])
 
2466
 
 
2467
    def test_add_version_smoke(self):
 
2468
        index = self.two_graph_index(catch_adds=True)
 
2469
        index.add_version('new', 'fulltext,no-eol', (None, 50, 60), [])
 
2470
        self.assertEqual([[(('new', ), 'N50 60')]],
 
2471
            self.caught_entries)
 
2472
 
 
2473
    def test_add_version_delta_not_delta_index(self):
 
2474
        index = self.two_graph_index(catch_adds=True)
 
2475
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2476
            'new', 'no-eol,line-delta', (None, 0, 100), [])
 
2477
        self.assertEqual([], self.caught_entries)
 
2478
 
 
2479
    def test_add_version_same_dup(self):
 
2480
        index = self.two_graph_index(catch_adds=True)
 
2481
        # options can be spelt two different ways
 
2482
        index.add_version('tip', 'fulltext,no-eol', (None, 0, 100), [])
 
2483
        index.add_version('tip', 'no-eol,fulltext', (None, 0, 100), [])
 
2484
        # but neither should have added data.
 
2485
        self.assertEqual([[], []], self.caught_entries)
 
2486
        
 
2487
    def test_add_version_different_dup(self):
 
2488
        index = self.two_graph_index(catch_adds=True)
 
2489
        # change options
 
2490
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2491
            'tip', 'no-eol,line-delta', (None, 0, 100), [])
 
2492
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2493
            'tip', 'line-delta,no-eol', (None, 0, 100), [])
 
2494
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2495
            'tip', 'fulltext', (None, 0, 100), [])
 
2496
        # position/length
 
2497
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2498
            'tip', 'fulltext,no-eol', (None, 50, 100), [])
 
2499
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2500
            'tip', 'fulltext,no-eol', (None, 0, 1000), [])
 
2501
        # parents
 
2502
        self.assertRaises(errors.KnitCorrupt, index.add_version,
 
2503
            'tip', 'fulltext,no-eol', (None, 0, 100), ['parent'])
 
2504
        self.assertEqual([], self.caught_entries)
 
2505
        
 
2506
    def test_add_versions(self):
 
2507
        index = self.two_graph_index(catch_adds=True)
 
2508
        index.add_versions([
 
2509
                ('new', 'fulltext,no-eol', (None, 50, 60), []),
 
2510
                ('new2', 'fulltext', (None, 0, 6), []),
 
2511
                ])
 
2512
        self.assertEqual([(('new', ), 'N50 60'), (('new2', ), ' 0 6')],
 
2513
            sorted(self.caught_entries[0]))
 
2514
        self.assertEqual(1, len(self.caught_entries))
 
2515
 
 
2516
    def test_add_versions_delta_not_delta_index(self):
 
2517
        index = self.two_graph_index(catch_adds=True)
 
2518
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2519
            [('new', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
 
2520
        self.assertEqual([], self.caught_entries)
 
2521
 
 
2522
    def test_add_versions_parents_not_parents_index(self):
 
2523
        index = self.two_graph_index(catch_adds=True)
 
2524
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2525
            [('new', 'no-eol,fulltext', (None, 0, 100), ['parent'])])
 
2526
        self.assertEqual([], self.caught_entries)
 
2527
 
 
2528
    def test_add_versions_same_dup(self):
 
2529
        index = self.two_graph_index(catch_adds=True)
 
2530
        # options can be spelt two different ways
 
2531
        index.add_versions([('tip', 'fulltext,no-eol', (None, 0, 100), [])])
 
2532
        index.add_versions([('tip', 'no-eol,fulltext', (None, 0, 100), [])])
 
2533
        # but neither should have added data.
 
2534
        self.assertEqual([[], []], self.caught_entries)
 
2535
        
 
2536
    def test_add_versions_different_dup(self):
 
2537
        index = self.two_graph_index(catch_adds=True)
 
2538
        # change options
 
2539
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2540
            [('tip', 'no-eol,line-delta', (None, 0, 100), [])])
 
2541
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2542
            [('tip', 'line-delta,no-eol', (None, 0, 100), [])])
 
2543
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2544
            [('tip', 'fulltext', (None, 0, 100), [])])
 
2545
        # position/length
 
2546
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2547
            [('tip', 'fulltext,no-eol', (None, 50, 100), [])])
 
2548
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2549
            [('tip', 'fulltext,no-eol', (None, 0, 1000), [])])
 
2550
        # parents
 
2551
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2552
            [('tip', 'fulltext,no-eol', (None, 0, 100), ['parent'])])
 
2553
        # change options in the second record
 
2554
        self.assertRaises(errors.KnitCorrupt, index.add_versions,
 
2555
            [('tip', 'fulltext,no-eol', (None, 0, 100), []),
 
2556
             ('tip', 'no-eol,line-delta', (None, 0, 100), [])])
 
2557
        self.assertEqual([], self.caught_entries)
 
2558
 
 
2559
    def test_iter_parents(self):
 
2560
        index = self.two_graph_index()
 
2561
        self.assertEqual(set([
 
2562
            ('tip', ()), ('tail', ()), ('parent', ()), ('separate', ())
 
2563
            ]),
 
2564
            set(index.iter_parents(['tip', 'tail', 'ghost', 'parent', 'separate'])))
 
2565
        self.assertEqual(set([('tip', ())]),
 
2566
            set(index.iter_parents(['tip'])))
 
2567
        self.assertEqual(set(),
 
2568
            set(index.iter_parents([])))