1
# Copyright (C) 2011 Canonical Ltd.
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Direct tests of the loggerhead/history.py module"""
20
from breezy.foreign import (
26
from datetime import datetime
27
from breezy import tag, tests
29
from .. import history as _mod_history
32
class TestCaseWithExamples(tests.TestCaseWithMemoryTransport):
34
def make_linear_ancestry(self):
41
builder = self.make_branch_builder('branch')
42
builder.start_series()
43
rev1 = builder.build_snapshot(None, [
44
('add', ('', b'root-id', 'directory', None))])
45
rev2 = builder.build_snapshot([rev1], [])
46
rev3 = builder.build_snapshot([rev2], [])
47
builder.finish_series()
48
b = builder.get_branch()
49
self.addCleanup(b.lock_read().unlock)
50
return _mod_history.History(b, {}), [rev1, rev2, rev3]
52
def make_long_linear_ancestry(self):
53
builder = self.make_branch_builder('branch')
55
builder.start_series()
56
revs.append(builder.build_snapshot(None, [
57
('add', ('', b'root-id', 'directory', None))]))
58
for r in "BCDEFGHIJKLMNOPQRSTUVWXYZ":
59
revs.append(builder.build_snapshot(None, []))
60
builder.finish_series()
61
b = builder.get_branch()
62
self.addCleanup(b.lock_read().unlock)
63
return _mod_history.History(b, {}), revs
65
def make_merged_ancestry(self):
72
builder = self.make_branch_builder('branch')
73
builder.start_series()
74
rev1 = builder.build_snapshot(None, [
75
('add', ('', b'root-id', 'directory', None))])
76
rev2 = builder.build_snapshot([rev1], [])
77
rev3 = builder.build_snapshot([rev1, rev2], [])
78
builder.finish_series()
79
b = builder.get_branch()
80
self.addCleanup(b.lock_read().unlock)
81
return _mod_history.History(b, {}), [rev1, rev2, rev3]
83
def make_deep_merged_ancestry(self):
94
builder = self.make_branch_builder('branch')
95
builder.start_series()
96
rev_a = builder.build_snapshot(None, [
97
('add', ('', b'root-id', 'directory', None))])
98
rev_b = builder.build_snapshot([rev_a], [])
99
rev_c = builder.build_snapshot([rev_a], [])
100
rev_d = builder.build_snapshot([rev_c], [])
101
rev_e = builder.build_snapshot([rev_c, rev_d], [])
102
rev_f = builder.build_snapshot([rev_b, rev_e], [])
103
builder.finish_series()
104
b = builder.get_branch()
105
self.addCleanup(b.lock_read().unlock)
106
return (_mod_history.History(b, {}),
107
[rev_a, rev_b, rev_c, rev_d, rev_e, rev_f])
109
def assertRevidsFrom(self, expected, history, search_revs, tip_rev):
110
self.assertEqual(expected,
111
list(history.get_revids_from(search_revs, tip_rev)))
114
class _DictProxy(object):
116
def __init__(self, d):
118
self._accessed = set()
119
self.__setitem__ = d.__setitem__
121
def __getitem__(self, name):
122
self._accessed.add(name)
129
def track_rev_info_accesses(h):
130
"""Track __getitem__ access to History._rev_info,
132
:return: set of items accessed
134
h._rev_info = _DictProxy(h._rev_info)
135
return h._rev_info._accessed
138
class TestHistoryGetRevidsFrom(TestCaseWithExamples):
140
def test_get_revids_from_simple_mainline(self):
141
history, revs = self.make_linear_ancestry()
142
self.assertRevidsFrom(list(reversed(revs)), history, None, revs[2])
144
def test_get_revids_from_merged_mainline(self):
145
history, revs = self.make_merged_ancestry()
146
self.assertRevidsFrom([revs[2], revs[0]],
147
history, None, revs[2])
149
def test_get_revids_given_one_rev(self):
150
history, revs = self.make_merged_ancestry()
151
# rev-3 was the first mainline revision to see rev-2.
152
self.assertRevidsFrom([revs[2]], history, [revs[1]], revs[2])
154
def test_get_revids_deep_ancestry(self):
155
history, revs = self.make_deep_merged_ancestry()
156
self.assertRevidsFrom([revs[-1]], history, [revs[-1]], revs[-1])
157
self.assertRevidsFrom([revs[-1]], history, [revs[-2]], revs[-1])
158
self.assertRevidsFrom([revs[-1]], history, [revs[-3]], revs[-1])
159
self.assertRevidsFrom([revs[-1]], history, [revs[-4]], revs[-1])
160
self.assertRevidsFrom([revs[1]], history, [revs[-5]], revs[-1])
161
self.assertRevidsFrom([revs[0]], history, [revs[-6]], revs[-1])
163
def test_get_revids_doesnt_over_produce_simple_mainline(self):
164
# get_revids_from shouldn't walk the whole ancestry just to get the
165
# answers for the first few revisions.
166
history, revs = self.make_long_linear_ancestry()
167
accessed = track_rev_info_accesses(history)
168
result = history.get_revids_from(None, revs[-1])
169
self.assertEqual(set(), accessed)
170
self.assertEqual(revs[-1], next(result))
171
# We already know revs[-1] because we passed it in.
172
self.assertEqual(set(), accessed)
173
self.assertEqual(revs[-2], next(result))
174
self.assertEqual(set([history._rev_indices[revs[-1]]]), accessed)
176
def test_get_revids_doesnt_over_produce_for_merges(self):
177
# get_revids_from shouldn't walk the whole ancestry just to get the
178
# answers for the first few revisions.
179
history, revs = self.make_long_linear_ancestry()
180
accessed = track_rev_info_accesses(history)
181
result = history.get_revids_from([revs[-3], revs[-5]], revs[-1])
182
self.assertEqual(set(), accessed)
183
self.assertEqual(revs[-3], next(result))
184
# We access 'W' because we are checking that W wasn't merged into X.
185
# The important bit is that we aren't getting the whole ancestry.
186
self.assertEqual(set([history._rev_indices[x] for x in list(reversed(revs))[:4]]),
188
self.assertEqual(revs[-5], next(result))
189
self.assertEqual(set([history._rev_indices[x] for x in list(reversed(revs))[:6]]),
191
self.assertRaises(StopIteration, next, result)
192
self.assertEqual(set([history._rev_indices[x] for x in list(reversed(revs))[:6]]),
197
class TestHistoryChangeFromRevision(tests.TestCaseWithTransport):
199
def make_single_commit(self):
200
tree = self.make_branch_and_tree('test')
201
rev_id = tree.commit('Commit Message', timestamp=1299838474.317,
202
timezone=3600, committer='Joe Example <joe@example.com>',
204
self.addCleanup(tree.branch.lock_write().unlock)
205
rev = tree.branch.repository.get_revision(rev_id)
206
history = _mod_history.History(tree.branch, {})
209
def test_simple(self):
210
history, rev = self.make_single_commit()
211
change = history._change_from_revision(rev)
212
self.assertEqual(rev.revision_id, change.revid)
213
self.assertEqual(datetime.fromtimestamp(1299838474.317),
215
self.assertEqual(datetime.utcfromtimestamp(1299838474.317),
217
self.assertEqual(['Joe Example <joe@example.com>'],
219
self.assertEqual('test', change.branch_nick)
220
self.assertEqual('Commit Message', change.short_comment)
221
self.assertEqual('Commit Message', change.comment)
222
self.assertEqual(['Commit Message'], change.comment_clean)
223
self.assertEqual([], change.parents)
224
self.assertEqual([], change.bugs)
225
self.assertEqual(None, change.tags)
228
history, rev = self.make_single_commit()
230
b.tags.set_tag('tag-1', rev.revision_id)
231
b.tags.set_tag('tag-2', rev.revision_id)
232
b.tags.set_tag('Tag-10', rev.revision_id)
233
change = history._change_from_revision(rev)
234
# If available, tags are 'naturally' sorted. (sorting numbers in order,
235
# and ignoring case, etc.)
236
if getattr(tag, 'sort_natural', None) is not None:
237
self.assertEqual('tag-1, tag-2, Tag-10', change.tags)
239
self.assertEqual('Tag-10, tag-1, tag-2', change.tags)
241
def test_committer_vs_authors(self):
242
tree = self.make_branch_and_tree('test')
243
rev_id = tree.commit('Commit Message', timestamp=1299838474.317,
244
timezone=3600, committer='Joe Example <joe@example.com>',
245
revprops={'authors': u'A Author <aauthor@example.com>\n'
246
u'B Author <bauthor@example.com>'})
247
self.addCleanup(tree.branch.lock_write().unlock)
248
rev = tree.branch.repository.get_revision(rev_id)
249
history = _mod_history.History(tree.branch, {})
250
change = history._change_from_revision(rev)
251
self.assertEqual(u'Joe Example <joe@example.com>',
253
self.assertEqual([u'A Author <aauthor@example.com>',
254
u'B Author <bauthor@example.com>'],
258
class TestHistory_IterateSufficiently(tests.TestCase):
260
def assertIterate(self, expected, iterable, stop_at, extra_rev_count):
261
self.assertEqual(expected, _mod_history.History._iterate_sufficiently(
262
iterable, stop_at, extra_rev_count))
264
def test_iter_no_extra(self):
265
lst = list('abcdefghijklmnopqrstuvwxyz')
266
self.assertIterate(['a', 'b', 'c'], iter(lst), 'c', 0)
267
self.assertIterate(['a', 'b', 'c', 'd'], iter(lst), 'd', 0)
269
def test_iter_not_found(self):
270
# If the key in question isn't found, we just exhaust the list
271
lst = list('abcdefghijklmnopqrstuvwxyz')
272
self.assertIterate(lst, iter(lst), 'not-there', 0)
274
def test_iter_with_extra(self):
275
lst = list('abcdefghijklmnopqrstuvwxyz')
276
self.assertIterate(['a', 'b', 'c'], iter(lst), 'b', 1)
277
self.assertIterate(['a', 'b', 'c', 'd', 'e'], iter(lst), 'c', 2)
279
def test_iter_with_too_many_extra(self):
280
lst = list('abcdefghijklmnopqrstuvwxyz')
281
self.assertIterate(lst, iter(lst), 'y', 10)
282
self.assertIterate(lst, iter(lst), 'z', 10)
284
def test_iter_with_extra_None(self):
285
lst = list('abcdefghijklmnopqrstuvwxyz')
286
self.assertIterate(lst, iter(lst), 'z', None)
290
class TestHistoryGetView(TestCaseWithExamples):
292
def test_get_view_limited_history(self):
293
# get_view should only load enough history to serve the result, not all
295
history, revs = self.make_long_linear_ancestry()
296
accessed = track_rev_info_accesses(history)
297
revid, start_revid, revid_list = history.get_view(revs[-1], revs[-1], None,
299
self.assertEqual(list(reversed(revs))[:6], revid_list)
300
self.assertEqual(revs[-1], revid)
301
self.assertEqual(revs[-1], start_revid)
302
self.assertEqual(set([history._rev_indices[x] for x in list(reversed(revs))[:6]]),
306
class TestHistoryGetChangedUncached(TestCaseWithExamples):
308
def test_native(self):
309
history, revs = self.make_linear_ancestry()
310
changes = history.get_changes_uncached([revs[0], revs[1]])
311
self.assertEquals(2, len(changes))
312
self.assertEquals(revs[0], changes[0].revid)
313
self.assertEquals(revs[1], changes[1].revid)
314
self.assertIs(None, getattr(changes[0], 'foreign_vcs', None))
315
self.assertIs(None, getattr(changes[0], 'foreign_revid', None))
317
def test_foreign(self):
318
# Test with a mocked foreign revision, as it's not possible
319
# to rely on any foreign plugins being installed.
320
history, revs = self.make_linear_ancestry()
321
foreign_vcs = ForeignVcs(None, "vcs")
322
foreign_vcs.show_foreign_revid = repr
323
foreign_rev = ForeignRevision(("uuid", 1234), VcsMapping(foreign_vcs),
324
"revid-in-bzr", message="message",
325
timestamp=234423423.3)
326
change = history._change_from_revision(foreign_rev)
327
self.assertEquals('revid-in-bzr', change.revid)
328
self.assertEquals("('uuid', 1234)", change.foreign_revid)
329
self.assertEquals("vcs", change.foreign_vcs)