1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24
from bzrlib.plugins.fastimport import (
29
from bzrlib.plugins.fastimport.processors import (
34
class TestCaseForGenericProcessor(tests.TestCaseWithTransport):
36
def get_handler(self):
37
branch = self.make_branch('.')
38
handler = generic_processor.GenericProcessor(branch.bzrdir)
39
return handler, branch
41
# FIXME: [] as a default is bad, as it is mutable, but I want
42
# to use None to mean "don't check this".
43
def assertChanges(self, branch, revno, expected_added=[],
44
expected_removed=[], expected_modified=[],
46
"""Check the changes introduced in a revision of a branch.
48
This method checks that a revision introduces expected changes.
49
The required changes are passed in as a list, where
50
each entry contains the needed information about the change.
52
If you do not wish to assert anything about a particular
53
category then pass None instead.
56
revno: revision number of revision to check.
57
expected_added: a list of (filename,) tuples that must have
58
been added in the delta.
59
expected_removed: a list of (filename,) tuples that must have
60
been removed in the delta.
61
expected_modified: a list of (filename,) tuples that must have
62
been modified in the delta.
63
expected_renamed: a list of (old_path, new_path) tuples that
64
must have been renamed in the delta.
65
:return: revtree1, revtree2
67
repo = branch.repository
68
revtree1 = repo.revision_tree(branch.revision_history()[revno - 1])
69
revtree2 = repo.revision_tree(branch.revision_history()[revno])
70
changes = revtree2.changes_from(revtree1)
71
self.check_changes(changes, expected_added, expected_removed,
72
expected_modified, expected_renamed)
73
return revtree1, revtree2
75
def check_changes(self, changes, expected_added=[],
76
expected_removed=[], expected_modified=[],
78
"""Check the changes in a TreeDelta
80
This method checks that the TreeDelta contains the expected
81
modifications between the two trees that were used to generate
82
it. The required changes are passed in as a list, where
83
each entry contains the needed information about the change.
85
If you do not wish to assert anything about a particular
86
category then pass None instead.
88
changes: The TreeDelta to check.
89
expected_added: a list of (filename,) tuples that must have
90
been added in the delta.
91
expected_removed: a list of (filename,) tuples that must have
92
been removed in the delta.
93
expected_modified: a list of (filename,) tuples that must have
94
been modified in the delta.
95
expected_renamed: a list of (old_path, new_path) tuples that
96
must have been renamed in the delta.
98
renamed = changes.renamed
100
removed = changes.removed
101
modified = changes.modified
102
if expected_renamed is not None:
103
self.assertEquals(len(renamed), len(expected_renamed),
104
"%s is renamed, expected %s" % (renamed, expected_renamed))
105
renamed_files = [(item[0], item[1]) for item in renamed]
106
for expected_renamed_entry in expected_renamed:
107
self.assertTrue(expected_renamed_entry in renamed_files,
108
"%s is not renamed, %s are" % (str(expected_renamed_entry),
110
if expected_added is not None:
111
self.assertEquals(len(added), len(expected_added),
112
"%s is added" % str(added))
113
added_files = [(item[0],) for item in added]
114
for expected_added_entry in expected_added:
115
self.assertTrue(expected_added_entry in added_files,
116
"%s is not added, %s are" % (str(expected_added_entry),
118
if expected_removed is not None:
119
self.assertEquals(len(removed), len(expected_removed),
120
"%s is removed" % str(removed))
121
removed_files = [(item[0],) for item in removed]
122
for expected_removed_entry in expected_removed:
123
self.assertTrue(expected_removed_entry in removed_files,
124
"%s is not removed, %s are" % (str(expected_removed_entry),
126
if expected_modified is not None:
127
self.assertEquals(len(modified), len(expected_modified),
128
"%s is modified" % str(modified))
129
modified_files = [(item[0],) for item in modified]
130
for expected_modified_entry in expected_modified:
131
self.assertTrue(expected_modified_entry in modified_files,
132
"%s is not modified, %s are" % (str(expected_modified_entry),
136
class TestRename(TestCaseForGenericProcessor):
138
def get_command_iter(self, old_path, new_path):
140
author = ['', 'bugs@a.com', time.time(), time.timezone]
141
committer = ['', 'elmer@a.com', time.time(), time.timezone]
143
yield commands.FileModifyCommand(old_path, 'file', False,
145
yield commands.CommitCommand('head', '1', author,
146
committer, "commit 1", None, [], files_one)
148
yield commands.FileRenameCommand(old_path, new_path)
149
yield commands.CommitCommand('head', '2', author,
150
committer, "commit 2", ":1", [], files_two)
153
def test_rename_in_root(self):
154
handler, branch = self.get_handler()
157
handler.process(self.get_command_iter(old_path, new_path))
158
revtree1, revtree2 = self.assertChanges(branch, 1,
159
expected_renamed=[(old_path, new_path)])
160
self.assertEqual(revtree1.get_revision_id(),
161
revtree1.inventory.root.children['a'].revision)
162
self.assertEqual(revtree2.get_revision_id(),
163
revtree2.inventory.root.children['b'].revision)
165
def test_rename_in_subdir(self):
166
handler, branch = self.get_handler()
169
handler.process(self.get_command_iter(old_path, new_path))
170
self.assertChanges(branch, 1, expected_renamed=[(old_path, new_path)])
172
def test_move_to_new_dir(self):
173
handler, branch = self.get_handler()
176
handler.process(self.get_command_iter(old_path, new_path))
177
self.assertChanges(branch, 1, expected_renamed=[(old_path, new_path)],
178
expected_added=[('b',)])
181
class TestCopy(TestCaseForGenericProcessor):
183
def file_command_iter(self, src_path, dest_path):
185
author = ['', 'bugs@a.com', time.time(), time.timezone]
186
committer = ['', 'elmer@a.com', time.time(), time.timezone]
188
yield commands.FileModifyCommand(src_path, 'file', False,
190
yield commands.CommitCommand('head', '1', author,
191
committer, "commit 1", None, [], files_one)
193
yield commands.FileCopyCommand(src_path, dest_path)
194
yield commands.CommitCommand('head', '2', author,
195
committer, "commit 2", ":1", [], files_two)
198
def assertContent(self, branch, tree, path, content):
199
file_id = tree.inventory.path2id(path)
201
self.addCleanup(branch.unlock)
202
self.assertEqual(tree.get_file_text(file_id), content)
204
def test_copy_file_in_root(self):
205
handler, branch = self.get_handler()
208
handler.process(self.file_command_iter(src_path, dest_path))
209
revtree1, revtree2 = self.assertChanges(branch, 1,
210
expected_added=[(dest_path,)])
211
self.assertContent(branch, revtree2, dest_path, "aaa")
212
self.assertEqual(revtree1.get_revision_id(),
213
revtree1.inventory.root.children['a'].revision)
214
self.assertEqual(revtree2.get_revision_id(),
215
revtree2.inventory.root.children['b'].revision)
217
def test_copy_file_in_subdir(self):
218
handler, branch = self.get_handler()
221
handler.process(self.file_command_iter(src_path, dest_path))
222
revtree1, revtree2 = self.assertChanges(branch, 1,
223
expected_added=[(dest_path,)])
224
self.assertContent(branch, revtree2, dest_path, "aaa")
226
def test_copy_file_to_new_dir(self):
227
handler, branch = self.get_handler()
230
handler.process(self.file_command_iter(src_path, dest_path))
231
revtree1, revtree2 = self.assertChanges(branch, 1,
232
expected_added=[('b',), (dest_path,)])
233
self.assertContent(branch, revtree2, dest_path, "aaa")
236
class TestFileKinds(TestCaseForGenericProcessor):
238
def get_command_iter(self, path, kind, content):
240
committer = ['', 'elmer@a.com', time.time(), time.timezone]
242
yield commands.FileModifyCommand(path, kind, False,
244
yield commands.CommitCommand('head', '1', None,
245
committer, "commit 1", None, [], files_one)
248
def test_import_plainfile(self):
249
handler, branch = self.get_handler()
250
handler.process(self.get_command_iter('foo', 'file', 'aaa'))
252
def test_import_symlink(self):
253
handler, branch = self.get_handler()
254
handler.process(self.get_command_iter('foo', 'symlink', 'bar'))