/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__annotator.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-07-20 08:56:45 UTC
  • mfrom: (4526.9.23 apply-inventory-delta)
  • Revision ID: pqm@pqm.ubuntu.com-20090720085645-54mtgybxua0yx6hw
(robertc) Add checks for inventory deltas which try to ensure that
        deltas that are not an exact fit are not applied. (Robert
        Collins, bug 397705, bug 367633)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for Annotators."""
 
18
 
 
19
from bzrlib import (
 
20
    annotate,
 
21
    _annotator_py,
 
22
    errors,
 
23
    knit,
 
24
    revision,
 
25
    tests,
 
26
    )
 
27
 
 
28
 
 
29
def load_tests(standard_tests, module, loader):
 
30
    """Parameterize tests for all versions of groupcompress."""
 
31
    scenarios = [
 
32
        ('python', {'module': _annotator_py}),
 
33
    ]
 
34
    suite = loader.suiteClass()
 
35
    if CompiledAnnotator.available():
 
36
        from bzrlib import _annotator_pyx
 
37
        scenarios.append(('C', {'module': _annotator_pyx}))
 
38
    else:
 
39
        # the compiled module isn't available, so we add a failing test
 
40
        class FailWithoutFeature(tests.TestCase):
 
41
            def test_fail(self):
 
42
                self.requireFeature(CompiledAnnotator)
 
43
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
 
44
    result = tests.multiply_tests(standard_tests, scenarios, suite)
 
45
    return result
 
46
 
 
47
 
 
48
class _CompiledAnnotator(tests.Feature):
 
49
 
 
50
    def _probe(self):
 
51
        try:
 
52
            import bzrlib._annotator_pyx
 
53
        except ImportError:
 
54
            return False
 
55
        return True
 
56
 
 
57
    def feature_name(self):
 
58
        return 'bzrlib._annotator_pyx'
 
59
 
 
60
CompiledAnnotator = _CompiledAnnotator()
 
61
 
 
62
 
 
63
class TestAnnotator(tests.TestCaseWithMemoryTransport):
 
64
 
 
65
    module = None # Set by load_tests
 
66
 
 
67
    fa_key = ('f-id', 'a-id')
 
68
    fb_key = ('f-id', 'b-id')
 
69
    fc_key = ('f-id', 'c-id')
 
70
    fd_key = ('f-id', 'd-id')
 
71
    fe_key = ('f-id', 'e-id')
 
72
    ff_key = ('f-id', 'f-id')
 
73
 
 
74
    def make_no_graph_texts(self):
 
75
        factory = knit.make_pack_factory(False, False, 2)
 
76
        self.vf = factory(self.get_transport())
 
77
        self.ann = self.module.Annotator(self.vf)
 
78
        self.vf.add_lines(self.fa_key, (), ['simple\n', 'content\n'])
 
79
        self.vf.add_lines(self.fb_key, (), ['simple\n', 'new content\n'])
 
80
 
 
81
    def make_simple_text(self):
 
82
        # TODO: all we really need is a VersionedFile instance, we'd like to
 
83
        #       avoid creating all the intermediate stuff
 
84
        factory = knit.make_pack_factory(True, True, 2)
 
85
        self.vf = factory(self.get_transport())
 
86
        # This assumes nothing special happens during __init__, which may be
 
87
        # valid
 
88
        self.ann = self.module.Annotator(self.vf)
 
89
        #  A    'simple|content|'
 
90
        #  |
 
91
        #  B    'simple|new content|'
 
92
        self.vf.add_lines(self.fa_key, [], ['simple\n', 'content\n'])
 
93
        self.vf.add_lines(self.fb_key, [self.fa_key],
 
94
                          ['simple\n', 'new content\n'])
 
95
 
 
96
    def make_merge_text(self):
 
97
        self.make_simple_text()
 
98
        #  A    'simple|content|'
 
99
        #  |\
 
100
        #  B |  'simple|new content|'
 
101
        #  | |
 
102
        #  | C  'simple|from c|content|'
 
103
        #  |/
 
104
        #  D    'simple|from c|new content|introduced in merge|'
 
105
        self.vf.add_lines(self.fc_key, [self.fa_key],
 
106
                          ['simple\n', 'from c\n', 'content\n'])
 
107
        self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
 
108
                          ['simple\n', 'from c\n', 'new content\n',
 
109
                           'introduced in merge\n'])
 
110
 
 
111
    def make_common_merge_text(self):
 
112
        """Both sides of the merge will have introduced a line."""
 
113
        self.make_simple_text()
 
114
        #  A    'simple|content|'
 
115
        #  |\
 
116
        #  B |  'simple|new content|'
 
117
        #  | |
 
118
        #  | C  'simple|new content|'
 
119
        #  |/
 
120
        #  D    'simple|new content|'
 
121
        self.vf.add_lines(self.fc_key, [self.fa_key],
 
122
                          ['simple\n', 'new content\n'])
 
123
        self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
 
124
                          ['simple\n', 'new content\n'])
 
125
 
 
126
    def make_many_way_common_merge_text(self):
 
127
        self.make_simple_text()
 
128
        #  A-.    'simple|content|'
 
129
        #  |\ \
 
130
        #  B | |  'simple|new content|'
 
131
        #  | | |
 
132
        #  | C |  'simple|new content|'
 
133
        #  |/  |
 
134
        #  D   |  'simple|new content|'
 
135
        #  |   |
 
136
        #  |   E  'simple|new content|'
 
137
        #  |  /
 
138
        #  F-'    'simple|new content|'
 
139
        self.vf.add_lines(self.fc_key, [self.fa_key],
 
140
                          ['simple\n', 'new content\n'])
 
141
        self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
 
142
                          ['simple\n', 'new content\n'])
 
143
        self.vf.add_lines(self.fe_key, [self.fa_key],
 
144
                          ['simple\n', 'new content\n'])
 
145
        self.vf.add_lines(self.ff_key, [self.fd_key, self.fe_key],
 
146
                          ['simple\n', 'new content\n'])
 
147
 
 
148
    def make_merge_and_restored_text(self):
 
149
        self.make_simple_text()
 
150
        #  A    'simple|content|'
 
151
        #  |\
 
152
        #  B |  'simple|new content|'
 
153
        #  | |
 
154
        #  C |  'simple|content|' # reverted to A
 
155
        #   \|
 
156
        #    D  'simple|content|'
 
157
        # c reverts back to 'a' for the new content line
 
158
        self.vf.add_lines(self.fc_key, [self.fb_key],
 
159
                          ['simple\n', 'content\n'])
 
160
        # d merges 'a' and 'c', to find both claim last modified
 
161
        self.vf.add_lines(self.fd_key, [self.fa_key, self.fc_key],
 
162
                          ['simple\n', 'content\n'])
 
163
 
 
164
    def assertAnnotateEqual(self, expected_annotation, key, exp_text=None):
 
165
        annotation, lines = self.ann.annotate(key)
 
166
        self.assertEqual(expected_annotation, annotation)
 
167
        if exp_text is None:
 
168
            record = self.vf.get_record_stream([key], 'unordered', True).next()
 
169
            exp_text = record.get_bytes_as('fulltext')
 
170
        self.assertEqualDiff(exp_text, ''.join(lines))
 
171
 
 
172
    def test_annotate_missing(self):
 
173
        self.make_simple_text()
 
174
        self.assertRaises(errors.RevisionNotPresent,
 
175
                          self.ann.annotate, ('not', 'present'))
 
176
 
 
177
    def test_annotate_simple(self):
 
178
        self.make_simple_text()
 
179
        self.assertAnnotateEqual([(self.fa_key,)]*2, self.fa_key)
 
180
        self.assertAnnotateEqual([(self.fa_key,), (self.fb_key,)], self.fb_key)
 
181
 
 
182
    def test_annotate_merge_text(self):
 
183
        self.make_merge_text()
 
184
        self.assertAnnotateEqual([(self.fa_key,), (self.fc_key,),
 
185
                                  (self.fb_key,), (self.fd_key,)],
 
186
                                 self.fd_key)
 
187
 
 
188
    def test_annotate_common_merge_text(self):
 
189
        self.make_common_merge_text()
 
190
        self.assertAnnotateEqual([(self.fa_key,), (self.fb_key, self.fc_key)],
 
191
                                 self.fd_key)
 
192
 
 
193
    def test_annotate_many_way_common_merge_text(self):
 
194
        self.make_many_way_common_merge_text()
 
195
        self.assertAnnotateEqual([(self.fa_key,),
 
196
                                  (self.fb_key, self.fc_key, self.fe_key)],
 
197
                                 self.ff_key)
 
198
 
 
199
    def test_annotate_merge_and_restored(self):
 
200
        self.make_merge_and_restored_text()
 
201
        self.assertAnnotateEqual([(self.fa_key,), (self.fa_key, self.fc_key)],
 
202
                                 self.fd_key)
 
203
 
 
204
    def test_annotate_flat_simple(self):
 
205
        self.make_simple_text()
 
206
        self.assertEqual([(self.fa_key, 'simple\n'),
 
207
                          (self.fa_key, 'content\n'),
 
208
                         ], self.ann.annotate_flat(self.fa_key))
 
209
        self.assertEqual([(self.fa_key, 'simple\n'),
 
210
                          (self.fb_key, 'new content\n'),
 
211
                         ], self.ann.annotate_flat(self.fb_key))
 
212
 
 
213
    def test_annotate_flat_merge_and_restored_text(self):
 
214
        self.make_merge_and_restored_text()
 
215
        # fc is a simple dominator of fa
 
216
        self.assertEqual([(self.fa_key, 'simple\n'),
 
217
                          (self.fc_key, 'content\n'),
 
218
                         ], self.ann.annotate_flat(self.fd_key))
 
219
 
 
220
    def test_annotate_common_merge_text(self):
 
221
        self.make_common_merge_text()
 
222
        # there is no common point, so we just pick the lexicographical lowest
 
223
        # and 'b-id' comes before 'c-id'
 
224
        self.assertEqual([(self.fa_key, 'simple\n'),
 
225
                          (self.fb_key, 'new content\n'),
 
226
                         ], self.ann.annotate_flat(self.fd_key))
 
227
 
 
228
    def test_annotate_many_way_common_merge_text(self):
 
229
        self.make_many_way_common_merge_text()
 
230
        self.assertEqual([(self.fa_key, 'simple\n'),
 
231
                         (self.fb_key, 'new content\n')],
 
232
                         self.ann.annotate_flat(self.ff_key))
 
233
 
 
234
    def test_annotate_flat_respects_break_ann_tie(self):
 
235
        tiebreaker = annotate._break_annotation_tie
 
236
        try:
 
237
            calls = []
 
238
            def custom_tiebreaker(annotated_lines):
 
239
                self.assertEqual(2, len(annotated_lines))
 
240
                left = annotated_lines[0]
 
241
                self.assertEqual(2, len(left))
 
242
                self.assertEqual('new content\n', left[1])
 
243
                right = annotated_lines[1]
 
244
                self.assertEqual(2, len(right))
 
245
                self.assertEqual('new content\n', right[1])
 
246
                calls.append((left[0], right[0]))
 
247
                # Our custom tiebreaker takes the *largest* value, rather than
 
248
                # the *smallest* value
 
249
                if left[0] < right[0]:
 
250
                    return right
 
251
                else:
 
252
                    return left
 
253
            annotate._break_annotation_tie = custom_tiebreaker
 
254
            self.make_many_way_common_merge_text()
 
255
            self.assertEqual([(self.fa_key, 'simple\n'),
 
256
                             (self.fe_key, 'new content\n')],
 
257
                             self.ann.annotate_flat(self.ff_key))
 
258
            self.assertEqual([(self.fe_key, self.fc_key),
 
259
                              (self.fe_key, self.fb_key)], calls)
 
260
        finally:
 
261
            annotate._break_annotation_tie = tiebreaker
 
262
 
 
263
 
 
264
    def test_needed_keys_simple(self):
 
265
        self.make_simple_text()
 
266
        keys, ann_keys = self.ann._get_needed_keys(self.fb_key)
 
267
        self.assertEqual([self.fa_key, self.fb_key], sorted(keys))
 
268
        self.assertEqual({self.fa_key: 1, self.fb_key: 1},
 
269
                         self.ann._num_needed_children)
 
270
        self.assertEqual(set(), ann_keys)
 
271
 
 
272
    def test_needed_keys_many(self):
 
273
        self.make_many_way_common_merge_text()
 
274
        keys, ann_keys = self.ann._get_needed_keys(self.ff_key)
 
275
        self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
 
276
                          self.fd_key, self.fe_key, self.ff_key,
 
277
                         ], sorted(keys))
 
278
        self.assertEqual({self.fa_key: 3,
 
279
                          self.fb_key: 1,
 
280
                          self.fc_key: 1,
 
281
                          self.fd_key: 1,
 
282
                          self.fe_key: 1,
 
283
                          self.ff_key: 1,
 
284
                         }, self.ann._num_needed_children)
 
285
        self.assertEqual(set(), ann_keys)
 
286
 
 
287
    def test_needed_keys_with_special_text(self):
 
288
        self.make_many_way_common_merge_text()
 
289
        spec_key = ('f-id', revision.CURRENT_REVISION)
 
290
        spec_text = 'simple\nnew content\nlocally modified\n'
 
291
        self.ann.add_special_text(spec_key, [self.fd_key, self.fe_key],
 
292
                                  spec_text)
 
293
        keys, ann_keys = self.ann._get_needed_keys(spec_key)
 
294
        self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
 
295
                          self.fd_key, self.fe_key,
 
296
                         ], sorted(keys))
 
297
        self.assertEqual([spec_key], sorted(ann_keys))
 
298
 
 
299
    def test_needed_keys_with_parent_texts(self):
 
300
        self.make_many_way_common_merge_text()
 
301
        # If 'D' and 'E' are already annotated, we don't need to extract all
 
302
        # the texts
 
303
        #  D   |  'simple|new content|'
 
304
        #  |   |
 
305
        #  |   E  'simple|new content|'
 
306
        #  |  /
 
307
        #  F-'    'simple|new content|'
 
308
        self.ann._parent_map[self.fd_key] = (self.fb_key, self.fc_key)
 
309
        self.ann._text_cache[self.fd_key] = ['simple\n', 'new content\n']
 
310
        self.ann._annotations_cache[self.fd_key] = [
 
311
            (self.fa_key,),
 
312
            (self.fb_key, self.fc_key),
 
313
            ]
 
314
        self.ann._parent_map[self.fe_key] = (self.fa_key,)
 
315
        self.ann._text_cache[self.fe_key] = ['simple\n', 'new content\n']
 
316
        self.ann._annotations_cache[self.fe_key] = [
 
317
            (self.fa_key,),
 
318
            (self.fe_key,),
 
319
            ]
 
320
        keys, ann_keys = self.ann._get_needed_keys(self.ff_key)
 
321
        self.assertEqual([self.ff_key], sorted(keys))
 
322
        self.assertEqual({self.fd_key: 1,
 
323
                          self.fe_key: 1,
 
324
                          self.ff_key: 1,
 
325
                         }, self.ann._num_needed_children)
 
326
        self.assertEqual([], sorted(ann_keys))
 
327
 
 
328
    def test_record_annotation_removes_texts(self):
 
329
        self.make_many_way_common_merge_text()
 
330
        # Populate the caches
 
331
        for x in self.ann._get_needed_texts(self.ff_key):
 
332
            continue
 
333
        self.assertEqual({self.fa_key: 3,
 
334
                          self.fb_key: 1,
 
335
                          self.fc_key: 1,
 
336
                          self.fd_key: 1,
 
337
                          self.fe_key: 1,
 
338
                          self.ff_key: 1,
 
339
                         }, self.ann._num_needed_children)
 
340
        self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
 
341
                          self.fd_key, self.fe_key, self.ff_key,
 
342
                         ], sorted(self.ann._text_cache.keys()))
 
343
        self.ann._record_annotation(self.fa_key, [], [])
 
344
        self.ann._record_annotation(self.fb_key, [self.fa_key], [])
 
345
        self.assertEqual({self.fa_key: 2,
 
346
                          self.fb_key: 1,
 
347
                          self.fc_key: 1,
 
348
                          self.fd_key: 1,
 
349
                          self.fe_key: 1,
 
350
                          self.ff_key: 1,
 
351
                         }, self.ann._num_needed_children)
 
352
        self.assertTrue(self.fa_key in self.ann._text_cache)
 
353
        self.assertTrue(self.fa_key in self.ann._annotations_cache)
 
354
        self.ann._record_annotation(self.fc_key, [self.fa_key], [])
 
355
        self.ann._record_annotation(self.fd_key, [self.fb_key, self.fc_key], [])
 
356
        self.assertEqual({self.fa_key: 1,
 
357
                          self.fb_key: 0,
 
358
                          self.fc_key: 0,
 
359
                          self.fd_key: 1,
 
360
                          self.fe_key: 1,
 
361
                          self.ff_key: 1,
 
362
                         }, self.ann._num_needed_children)
 
363
        self.assertTrue(self.fa_key in self.ann._text_cache)
 
364
        self.assertTrue(self.fa_key in self.ann._annotations_cache)
 
365
        self.assertFalse(self.fb_key in self.ann._text_cache)
 
366
        self.assertFalse(self.fb_key in self.ann._annotations_cache)
 
367
        self.assertFalse(self.fc_key in self.ann._text_cache)
 
368
        self.assertFalse(self.fc_key in self.ann._annotations_cache)
 
369
 
 
370
    def test_annotate_special_text(self):
 
371
        # Things like WT and PreviewTree want to annotate an arbitrary text
 
372
        # ('current:') so we need a way to add that to the group of files to be
 
373
        # annotated.
 
374
        self.make_many_way_common_merge_text()
 
375
        #  A-.    'simple|content|'
 
376
        #  |\ \
 
377
        #  B | |  'simple|new content|'
 
378
        #  | | |
 
379
        #  | C |  'simple|new content|'
 
380
        #  |/  |
 
381
        #  D   |  'simple|new content|'
 
382
        #  |   |
 
383
        #  |   E  'simple|new content|'
 
384
        #  |  /
 
385
        #  SPEC   'simple|new content|locally modified|'
 
386
        spec_key = ('f-id', revision.CURRENT_REVISION)
 
387
        spec_text = 'simple\nnew content\nlocally modified\n'
 
388
        self.ann.add_special_text(spec_key, [self.fd_key, self.fe_key],
 
389
                                  spec_text)
 
390
        self.assertAnnotateEqual([(self.fa_key,),
 
391
                                  (self.fb_key, self.fc_key, self.fe_key),
 
392
                                  (spec_key,),
 
393
                                 ], spec_key,
 
394
                                 exp_text=spec_text)
 
395
 
 
396
    def test_no_graph(self):
 
397
        self.make_no_graph_texts()
 
398
        self.assertAnnotateEqual([(self.fa_key,),
 
399
                                  (self.fa_key,),
 
400
                                 ], self.fa_key)
 
401
        self.assertAnnotateEqual([(self.fb_key,),
 
402
                                  (self.fb_key,),
 
403
                                 ], self.fb_key)