/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

Merge from bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
    )
37
37
from bzrlib.index import *
38
38
from bzrlib.knit import (
 
39
    AnnotatedKnitContent,
39
40
    KnitContent,
40
41
    KnitGraphIndex,
41
42
    KnitVersionedFile,
45
46
    _KnitData,
46
47
    _KnitIndex,
47
48
    _PackAccess,
 
49
    PlainKnitContent,
48
50
    WeaveToKnit,
49
51
    KnitSequenceMatcher,
50
52
    )
55
57
    TestCaseWithMemoryTransport,
56
58
    TestCaseWithTransport,
57
59
    )
58
 
from bzrlib.transport import TransportLogger, get_transport
 
60
from bzrlib.transport import get_transport
59
61
from bzrlib.transport.memory import MemoryTransport
60
62
from bzrlib.util import bencode
61
63
from bzrlib.weave import Weave
76
78
CompiledKnitFeature = _CompiledKnitFeature()
77
79
 
78
80
 
79
 
class KnitContentTests(TestCase):
 
81
class KnitContentTestsMixin(object):
80
82
 
81
83
    def test_constructor(self):
82
 
        content = KnitContent([])
 
84
        content = self._make_content([])
83
85
 
84
86
    def test_text(self):
85
 
        content = KnitContent([])
 
87
        content = self._make_content([])
86
88
        self.assertEqual(content.text(), [])
87
89
 
88
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
90
        content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
89
91
        self.assertEqual(content.text(), ["text1", "text2"])
90
92
 
91
 
    def test_annotate(self):
92
 
        content = KnitContent([])
93
 
        self.assertEqual(content.annotate(), [])
94
 
 
95
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
93
    def test_copy(self):
 
94
        content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
 
95
        copy = content.copy()
 
96
        self.assertIsInstance(copy, content.__class__)
 
97
        self.assertEqual(copy.annotate(), content.annotate())
 
98
 
 
99
    def assertDerivedBlocksEqual(self, source, target, noeol=False):
 
100
        """Assert that the derived matching blocks match real output"""
 
101
        source_lines = source.splitlines(True)
 
102
        target_lines = target.splitlines(True)
 
103
        def nl(line):
 
104
            if noeol and not line.endswith('\n'):
 
105
                return line + '\n'
 
106
            else:
 
107
                return line
 
108
        source_content = self._make_content([(None, nl(l)) for l in source_lines])
 
109
        target_content = self._make_content([(None, nl(l)) for l in target_lines])
 
110
        line_delta = source_content.line_delta(target_content)
 
111
        delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
 
112
            source_lines, target_lines))
 
113
        matcher = KnitSequenceMatcher(None, source_lines, target_lines)
 
114
        matcher_blocks = list(list(matcher.get_matching_blocks()))
 
115
        self.assertEqual(matcher_blocks, delta_blocks)
 
116
 
 
117
    def test_get_line_delta_blocks(self):
 
118
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
 
119
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
 
120
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
 
121
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
 
122
        self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
 
123
        self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
 
124
        self.assertDerivedBlocksEqual(TEXT_1A, '')
 
125
        self.assertDerivedBlocksEqual('', TEXT_1A)
 
126
        self.assertDerivedBlocksEqual('', '')
 
127
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
 
128
 
 
129
    def test_get_line_delta_blocks_noeol(self):
 
130
        """Handle historical knit deltas safely
 
131
 
 
132
        Some existing knit deltas don't consider the last line to differ
 
133
        when the only difference whether it has a final newline.
 
134
 
 
135
        New knit deltas appear to always consider the last line to differ
 
136
        in this case.
 
137
        """
 
138
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
 
139
        self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
 
140
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
 
141
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
 
142
 
 
143
 
 
144
class TestPlainKnitContent(TestCase, KnitContentTestsMixin):
 
145
 
 
146
    def _make_content(self, lines):
 
147
        annotated_content = AnnotatedKnitContent(lines)
 
148
        return PlainKnitContent(annotated_content.text(), 'bogus')
 
149
 
 
150
    def test_annotate(self):
 
151
        content = self._make_content([])
 
152
        self.assertEqual(content.annotate(), [])
 
153
 
 
154
        content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
 
155
        self.assertEqual(content.annotate(),
 
156
            [("bogus", "text1"), ("bogus", "text2")])
 
157
 
 
158
    def test_annotate_iter(self):
 
159
        content = self._make_content([])
 
160
        it = content.annotate_iter()
 
161
        self.assertRaises(StopIteration, it.next)
 
162
 
 
163
        content = self._make_content([("bogus", "text1"), ("bogus", "text2")])
 
164
        it = content.annotate_iter()
 
165
        self.assertEqual(it.next(), ("bogus", "text1"))
 
166
        self.assertEqual(it.next(), ("bogus", "text2"))
 
167
        self.assertRaises(StopIteration, it.next)
 
168
 
 
169
    def test_line_delta(self):
 
170
        content1 = self._make_content([("", "a"), ("", "b")])
 
171
        content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
 
172
        self.assertEqual(content1.line_delta(content2),
 
173
            [(1, 2, 2, ["a", "c"])])
 
174
 
 
175
    def test_line_delta_iter(self):
 
176
        content1 = self._make_content([("", "a"), ("", "b")])
 
177
        content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
 
178
        it = content1.line_delta_iter(content2)
 
179
        self.assertEqual(it.next(), (1, 2, 2, ["a", "c"]))
 
180
        self.assertRaises(StopIteration, it.next)
 
181
 
 
182
 
 
183
class TestAnnotatedKnitContent(TestCase, KnitContentTestsMixin):
 
184
 
 
185
    def _make_content(self, lines):
 
186
        return AnnotatedKnitContent(lines)
 
187
 
 
188
    def test_annotate(self):
 
189
        content = self._make_content([])
 
190
        self.assertEqual(content.annotate(), [])
 
191
 
 
192
        content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
96
193
        self.assertEqual(content.annotate(),
97
194
            [("origin1", "text1"), ("origin2", "text2")])
98
195
 
99
196
    def test_annotate_iter(self):
100
 
        content = KnitContent([])
 
197
        content = self._make_content([])
101
198
        it = content.annotate_iter()
102
199
        self.assertRaises(StopIteration, it.next)
103
200
 
104
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
 
201
        content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
105
202
        it = content.annotate_iter()
106
203
        self.assertEqual(it.next(), ("origin1", "text1"))
107
204
        self.assertEqual(it.next(), ("origin2", "text2"))
108
205
        self.assertRaises(StopIteration, it.next)
109
206
 
110
 
    def test_copy(self):
111
 
        content = KnitContent([("origin1", "text1"), ("origin2", "text2")])
112
 
        copy = content.copy()
113
 
        self.assertIsInstance(copy, KnitContent)
114
 
        self.assertEqual(copy.annotate(),
115
 
            [("origin1", "text1"), ("origin2", "text2")])
116
 
 
117
207
    def test_line_delta(self):
118
 
        content1 = KnitContent([("", "a"), ("", "b")])
119
 
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
208
        content1 = self._make_content([("", "a"), ("", "b")])
 
209
        content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
120
210
        self.assertEqual(content1.line_delta(content2),
121
211
            [(1, 2, 2, [("", "a"), ("", "c")])])
122
212
 
123
213
    def test_line_delta_iter(self):
124
 
        content1 = KnitContent([("", "a"), ("", "b")])
125
 
        content2 = KnitContent([("", "a"), ("", "a"), ("", "c")])
 
214
        content1 = self._make_content([("", "a"), ("", "b")])
 
215
        content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
126
216
        it = content1.line_delta_iter(content2)
127
217
        self.assertEqual(it.next(), (1, 2, 2, [("", "a"), ("", "c")]))
128
218
        self.assertRaises(StopIteration, it.next)
726
816
        self.assertEqual(["c"], index.get_parents_with_ghosts("a"))
727
817
        self.assertEqual(["a"], index.get_parents_with_ghosts("b"))
728
818
 
 
819
    def test_add_versions_random_id_is_accepted(self):
 
820
        transport = MockTransport([
 
821
            _KnitIndex.HEADER
 
822
            ])
 
823
        index = self.get_knit_index(transport, "filename", "r")
 
824
 
 
825
        index.add_versions([
 
826
            ("a", ["option"], (None, 0, 1), ["b"]),
 
827
            ("a", ["opt"], (None, 1, 2), ["c"]),
 
828
            ("b", ["option"], (None, 2, 3), ["a"])
 
829
            ], random_id=True)
 
830
 
729
831
    def test_delay_create_and_add_versions(self):
730
832
        transport = MockTransport()
731
833
 
1074
1176
    def test_delta(self):
1075
1177
        """Expression of knit delta as lines"""
1076
1178
        k = self.make_test_knit()
1077
 
        KnitContent
1078
1179
        td = list(line_delta(TEXT_1.splitlines(True),
1079
1180
                             TEXT_1A.splitlines(True)))
1080
1181
        self.assertEqualDiff(''.join(td), delta_1_1a)
1081
1182
        out = apply_line_delta(TEXT_1.splitlines(True), td)
1082
1183
        self.assertEqualDiff(''.join(out), TEXT_1A)
1083
1184
 
1084
 
    def assertDerivedBlocksEqual(self, source, target, noeol=False):
1085
 
        """Assert that the derived matching blocks match real output"""
1086
 
        source_lines = source.splitlines(True)
1087
 
        target_lines = target.splitlines(True)
1088
 
        def nl(line):
1089
 
            if noeol and not line.endswith('\n'):
1090
 
                return line + '\n'
1091
 
            else:
1092
 
                return line
1093
 
        source_content = KnitContent([(None, nl(l)) for l in source_lines])
1094
 
        target_content = KnitContent([(None, nl(l)) for l in target_lines])
1095
 
        line_delta = source_content.line_delta(target_content)
1096
 
        delta_blocks = list(KnitContent.get_line_delta_blocks(line_delta,
1097
 
            source_lines, target_lines))
1098
 
        matcher = KnitSequenceMatcher(None, source_lines, target_lines)
1099
 
        matcher_blocks = list(list(matcher.get_matching_blocks()))
1100
 
        self.assertEqual(matcher_blocks, delta_blocks)
1101
 
 
1102
 
    def test_get_line_delta_blocks(self):
1103
 
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'q\nc\n')
1104
 
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1)
1105
 
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1A)
1106
 
        self.assertDerivedBlocksEqual(TEXT_1, TEXT_1B)
1107
 
        self.assertDerivedBlocksEqual(TEXT_1B, TEXT_1A)
1108
 
        self.assertDerivedBlocksEqual(TEXT_1A, TEXT_1B)
1109
 
        self.assertDerivedBlocksEqual(TEXT_1A, '')
1110
 
        self.assertDerivedBlocksEqual('', TEXT_1A)
1111
 
        self.assertDerivedBlocksEqual('', '')
1112
 
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd')
1113
 
 
1114
 
    def test_get_line_delta_blocks_noeol(self):
1115
 
        """Handle historical knit deltas safely
1116
 
 
1117
 
        Some existing knit deltas don't consider the last line to differ
1118
 
        when the only difference whether it has a final newline.
1119
 
 
1120
 
        New knit deltas appear to always consider the last line to differ
1121
 
        in this case.
1122
 
        """
1123
 
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\nd\n', noeol=True)
1124
 
        self.assertDerivedBlocksEqual('a\nb\nc\nd\n', 'a\nb\nc', noeol=True)
1125
 
        self.assertDerivedBlocksEqual('a\nb\nc\n', 'a\nb\nc', noeol=True)
1126
 
        self.assertDerivedBlocksEqual('a\nb\nc', 'a\nb\nc\n', noeol=True)
1127
 
 
1128
1185
    def test_add_with_parents(self):
1129
1186
        """Store in knit with parents"""
1130
1187
        k = self.make_test_knit()
1258
1315
        self.assertEquals(origins[1], ('text-1', 'b\n'))
1259
1316
        self.assertEquals(origins[2], ('text-1', 'c\n'))
1260
1317
 
1261
 
    def test_knit_join(self):
1262
 
        """Store in knit with parents"""
1263
 
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
1264
 
        k1.add_lines('text-a', [], split_lines(TEXT_1))
1265
 
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
1266
 
 
1267
 
        k1.add_lines('text-c', [], split_lines(TEXT_1))
1268
 
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
1269
 
 
1270
 
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
1271
 
 
1272
 
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
1318
    def _test_join_with_factories(self, k1_factory, k2_factory):
 
1319
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=k1_factory, create=True)
 
1320
        k1.add_lines('text-a', [], ['a1\n', 'a2\n', 'a3\n'])
 
1321
        k1.add_lines('text-b', ['text-a'], ['a1\n', 'b2\n', 'a3\n'])
 
1322
        k1.add_lines('text-c', [], ['c1\n', 'c2\n', 'c3\n'])
 
1323
        k1.add_lines('text-d', ['text-c'], ['c1\n', 'd2\n', 'd3\n'])
 
1324
        k1.add_lines('text-m', ['text-b', 'text-d'], ['a1\n', 'b2\n', 'd3\n'])
 
1325
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=k2_factory, create=True)
1273
1326
        count = k2.join(k1, version_ids=['text-m'])
1274
1327
        self.assertEquals(count, 5)
1275
1328
        self.assertTrue(k2.has_version('text-a'))
1276
1329
        self.assertTrue(k2.has_version('text-c'))
 
1330
        origins = k2.annotate('text-m')
 
1331
        self.assertEquals(origins[0], ('text-a', 'a1\n'))
 
1332
        self.assertEquals(origins[1], ('text-b', 'b2\n'))
 
1333
        self.assertEquals(origins[2], ('text-d', 'd3\n'))
 
1334
 
 
1335
    def test_knit_join_plain_to_plain(self):
 
1336
        """Test joining a plain knit with a plain knit."""
 
1337
        self._test_join_with_factories(KnitPlainFactory(), KnitPlainFactory())
 
1338
 
 
1339
    def test_knit_join_anno_to_anno(self):
 
1340
        """Test joining an annotated knit with an annotated knit."""
 
1341
        self._test_join_with_factories(None, None)
 
1342
 
 
1343
    def test_knit_join_anno_to_plain(self):
 
1344
        """Test joining an annotated knit with a plain knit."""
 
1345
        self._test_join_with_factories(None, KnitPlainFactory())
 
1346
 
 
1347
    def test_knit_join_plain_to_anno(self):
 
1348
        """Test joining a plain knit with an annotated knit."""
 
1349
        self._test_join_with_factories(KnitPlainFactory(), None)
1277
1350
 
1278
1351
    def test_reannotate(self):
1279
1352
        k1 = KnitVersionedFile('knit1', get_transport('.'),
1317
1390
        k1.get_texts(('%d' % t) for t in range(3))
1318
1391
        
1319
1392
    def test_iter_lines_reads_in_order(self):
1320
 
        t = MemoryTransport()
1321
 
        instrumented_t = TransportLogger(t)
 
1393
        instrumented_t = get_transport('trace+memory:///')
1322
1394
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
1323
 
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
1395
        self.assertEqual([('get', 'id.kndx',)], instrumented_t._activity)
1324
1396
        # add texts with no required ordering
1325
1397
        k1.add_lines('base', [], ['text\n'])
1326
1398
        k1.add_lines('base2', [], ['text2\n'])
1327
1399
        k1.clear_cache()
1328
 
        instrumented_t._calls = []
 
1400
        # clear the logged activity, but preserve the list instance in case of
 
1401
        # clones pointing at it.
 
1402
        del instrumented_t._activity[:]
1329
1403
        # request a last-first iteration
1330
 
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
1331
 
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
1404
        results = list(k1.iter_lines_added_or_present_in_versions(
 
1405
            ['base2', 'base']))
 
1406
        self.assertEqual(
 
1407
            [('readv', 'id.knit', [(0, 87), (87, 89)], False, None)],
 
1408
            instrumented_t._activity)
1332
1409
        self.assertEqual(['text\n', 'text2\n'], results)
1333
1410
 
1334
1411
    def test_create_empty_annotated(self):
1850
1927
 
1851
1928
class TestKnitCaching(KnitTests):
1852
1929
    
1853
 
    def create_knit(self, cache_add=False):
 
1930
    def create_knit(self):
1854
1931
        k = self.make_test_knit(True)
1855
 
        if cache_add:
1856
 
            k.enable_cache()
1857
 
 
1858
1932
        k.add_lines('text-1', [], split_lines(TEXT_1))
1859
1933
        k.add_lines('text-2', [], split_lines(TEXT_2))
1860
1934
        return k
1864
1938
        # Nothing should be cached without setting 'enable_cache'
1865
1939
        self.assertEqual({}, k._data._cache)
1866
1940
 
1867
 
    def test_cache_add_and_clear(self):
1868
 
        k = self.create_knit(True)
1869
 
 
1870
 
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
1871
 
 
1872
 
        k.clear_cache()
1873
 
        self.assertEqual({}, k._data._cache)
1874
 
 
1875
1941
    def test_cache_data_read_raw(self):
1876
1942
        k = self.create_knit()
1877
1943
 
2262
2328
            [('new', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
2263
2329
        self.assertEqual([], self.caught_entries)
2264
2330
 
 
2331
    def test_add_versions_random_id_accepted(self):
 
2332
        index = self.two_graph_index(catch_adds=True)
 
2333
        index.add_versions([], random_id=True)
 
2334
 
2265
2335
    def test_add_versions_same_dup(self):
2266
2336
        index = self.two_graph_index(catch_adds=True)
2267
2337
        # options can be spelt two different ways
2525
2595
            [('new', 'no-eol,fulltext', (None, 0, 100), ['parent'])])
2526
2596
        self.assertEqual([], self.caught_entries)
2527
2597
 
 
2598
    def test_add_versions_random_id_accepted(self):
 
2599
        index = self.two_graph_index(catch_adds=True)
 
2600
        index.add_versions([], random_id=True)
 
2601
 
2528
2602
    def test_add_versions_same_dup(self):
2529
2603
        index = self.two_graph_index(catch_adds=True)
2530
2604
        # options can be spelt two different ways