/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to tests/test_generic_processor.py

  • Committer: Ian Clatworthy
  • Date: 2009-02-17 23:37:24 UTC
  • mto: (0.64.114 trunk)
  • mto: This revision was merged to the branch mainline in revision 6631.
  • Revision ID: ian.clatworthy@canonical.com-20090217233724-y6q12cyoyln6vkh6
code & tests for file copying

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import time
 
18
 
 
19
from bzrlib import (
 
20
    branch,
 
21
    tests,
 
22
    )
 
23
 
 
24
from bzrlib.plugins.fastimport import (
 
25
    commands,
 
26
    errors,
 
27
    )
 
28
 
 
29
from bzrlib.plugins.fastimport.processors import (
 
30
    generic_processor,
 
31
    )
 
32
 
 
33
 
 
34
class TestCaseForGenericProcessor(tests.TestCaseWithTransport):
 
35
 
 
36
    def get_handler(self):
 
37
        branch = self.make_branch('.')
 
38
        handler = generic_processor.GenericProcessor(branch.bzrdir)
 
39
        return handler, branch
 
40
 
 
41
    # FIXME: [] as a default is bad, as it is mutable, but I want
 
42
    # to use None to mean "don't check this".
 
43
    def assertChanges(self, branch, revno, expected_added=[],
 
44
            expected_removed=[], expected_modified=[],
 
45
            expected_renamed=[]):
 
46
        """Check the changes introduced in a revision of a branch.
 
47
 
 
48
        This method checks that a revision introduces expected changes.
 
49
        The required changes are passed in as a list, where
 
50
        each entry contains the needed information about the change.
 
51
 
 
52
        If you do not wish to assert anything about a particular
 
53
        category then pass None instead.
 
54
 
 
55
        branch: The branch.
 
56
        revno: revision number of revision to check.
 
57
        expected_added: a list of (filename,) tuples that must have
 
58
            been added in the delta.
 
59
        expected_removed: a list of (filename,) tuples that must have
 
60
            been removed in the delta.
 
61
        expected_modified: a list of (filename,) tuples that must have
 
62
            been modified in the delta.
 
63
        expected_renamed: a list of (old_path, new_path) tuples that
 
64
            must have been renamed in the delta.
 
65
        :return: revtree1, revtree2
 
66
        """
 
67
        repo = branch.repository
 
68
        revtree1 = repo.revision_tree(branch.revision_history()[revno - 1])
 
69
        revtree2 = repo.revision_tree(branch.revision_history()[revno])
 
70
        changes = revtree2.changes_from(revtree1)
 
71
        self.check_changes(changes, expected_added, expected_removed,
 
72
            expected_modified, expected_renamed)
 
73
        return revtree1, revtree2
 
74
 
 
75
    def check_changes(self, changes, expected_added=[],
 
76
            expected_removed=[], expected_modified=[],
 
77
            expected_renamed=[]):
 
78
        """Check the changes in a TreeDelta
 
79
 
 
80
        This method checks that the TreeDelta contains the expected
 
81
        modifications between the two trees that were used to generate
 
82
        it. The required changes are passed in as a list, where
 
83
        each entry contains the needed information about the change.
 
84
 
 
85
        If you do not wish to assert anything about a particular
 
86
        category then pass None instead.
 
87
 
 
88
        changes: The TreeDelta to check.
 
89
        expected_added: a list of (filename,) tuples that must have
 
90
            been added in the delta.
 
91
        expected_removed: a list of (filename,) tuples that must have
 
92
            been removed in the delta.
 
93
        expected_modified: a list of (filename,) tuples that must have
 
94
            been modified in the delta.
 
95
        expected_renamed: a list of (old_path, new_path) tuples that
 
96
            must have been renamed in the delta.
 
97
        """
 
98
        renamed = changes.renamed
 
99
        added = changes.added
 
100
        removed = changes.removed
 
101
        modified = changes.modified
 
102
        if expected_renamed is not None:
 
103
            self.assertEquals(len(renamed), len(expected_renamed),
 
104
                "%s is renamed, expected %s" % (renamed, expected_renamed))
 
105
            renamed_files = [(item[0], item[1]) for item in renamed]
 
106
            for expected_renamed_entry in expected_renamed:
 
107
                self.assertTrue(expected_renamed_entry in renamed_files,
 
108
                    "%s is not renamed, %s are" % (str(expected_renamed_entry),
 
109
                        renamed_files))
 
110
        if expected_added is not None:
 
111
            self.assertEquals(len(added), len(expected_added),
 
112
                "%s is added" % str(added))
 
113
            added_files = [(item[0],) for item in added]
 
114
            for expected_added_entry in expected_added:
 
115
                self.assertTrue(expected_added_entry in added_files,
 
116
                    "%s is not added, %s are" % (str(expected_added_entry),
 
117
                        added_files))
 
118
        if expected_removed is not None:
 
119
            self.assertEquals(len(removed), len(expected_removed),
 
120
                "%s is removed" % str(removed))
 
121
            removed_files = [(item[0],) for item in removed]
 
122
            for expected_removed_entry in expected_removed:
 
123
                self.assertTrue(expected_removed_entry in removed_files,
 
124
                    "%s is not removed, %s are" % (str(expected_removed_entry),
 
125
                        removed_files))
 
126
        if expected_modified is not None:
 
127
            self.assertEquals(len(modified), len(expected_modified),
 
128
                "%s is modified" % str(modified))
 
129
            modified_files = [(item[0],) for item in modified]
 
130
            for expected_modified_entry in expected_modified:
 
131
                self.assertTrue(expected_modified_entry in modified_files,
 
132
                    "%s is not modified, %s are" % (str(expected_modified_entry),
 
133
                        modified_files))
 
134
 
 
135
 
 
136
class TestRename(TestCaseForGenericProcessor):
 
137
 
 
138
    def get_command_iter(self, old_path, new_path):
 
139
        def command_list():
 
140
            author = ['', 'bugs@a.com', time.time(), time.timezone]
 
141
            committer = ['', 'elmer@a.com', time.time(), time.timezone]
 
142
            def files_one():
 
143
                yield commands.FileModifyCommand(old_path, 'file', False,
 
144
                        None, "aaa")
 
145
            yield commands.CommitCommand('head', '1', author,
 
146
                committer, "commit 1", None, [], files_one)
 
147
            def files_two():
 
148
                yield commands.FileRenameCommand(old_path, new_path)
 
149
            yield commands.CommitCommand('head', '2', author,
 
150
                committer, "commit 2", ":1", [], files_two)
 
151
        return command_list
 
152
 
 
153
    def test_rename_in_root(self):
 
154
        handler, branch = self.get_handler()
 
155
        old_path = 'a'
 
156
        new_path = 'b'
 
157
        handler.process(self.get_command_iter(old_path, new_path))
 
158
        revtree1, revtree2 = self.assertChanges(branch, 1,
 
159
            expected_renamed=[(old_path, new_path)])
 
160
        self.assertEqual(revtree1.get_revision_id(),
 
161
                         revtree1.inventory.root.children['a'].revision)
 
162
        self.assertEqual(revtree2.get_revision_id(),
 
163
                         revtree2.inventory.root.children['b'].revision)
 
164
 
 
165
    def test_rename_in_subdir(self):
 
166
        handler, branch = self.get_handler()
 
167
        old_path = 'a/a'
 
168
        new_path = 'a/b'
 
169
        handler.process(self.get_command_iter(old_path, new_path))
 
170
        self.assertChanges(branch, 1, expected_renamed=[(old_path, new_path)])
 
171
 
 
172
    def test_move_to_new_dir(self):
 
173
        handler, branch = self.get_handler()
 
174
        old_path = 'a/a'
 
175
        new_path = 'b/a'
 
176
        handler.process(self.get_command_iter(old_path, new_path))
 
177
        self.assertChanges(branch, 1, expected_renamed=[(old_path, new_path)],
 
178
            expected_added=[('b',)])
 
179
 
 
180
 
 
181
class TestCopy(TestCaseForGenericProcessor):
 
182
 
 
183
    def file_command_iter(self, src_path, dest_path):
 
184
        def command_list():
 
185
            author = ['', 'bugs@a.com', time.time(), time.timezone]
 
186
            committer = ['', 'elmer@a.com', time.time(), time.timezone]
 
187
            def files_one():
 
188
                yield commands.FileModifyCommand(src_path, 'file', False,
 
189
                        None, "aaa")
 
190
            yield commands.CommitCommand('head', '1', author,
 
191
                committer, "commit 1", None, [], files_one)
 
192
            def files_two():
 
193
                yield commands.FileCopyCommand(src_path, dest_path)
 
194
            yield commands.CommitCommand('head', '2', author,
 
195
                committer, "commit 2", ":1", [], files_two)
 
196
        return command_list
 
197
 
 
198
    def assertContent(self, branch, tree, path, content):
 
199
        file_id = tree.inventory.path2id(path)
 
200
        branch.lock_read()
 
201
        self.addCleanup(branch.unlock)
 
202
        self.assertEqual(tree.get_file_text(file_id), content)
 
203
 
 
204
    def test_copy_file_in_root(self):
 
205
        handler, branch = self.get_handler()
 
206
        src_path = 'a'
 
207
        dest_path = 'b'
 
208
        handler.process(self.file_command_iter(src_path, dest_path))
 
209
        revtree1, revtree2 = self.assertChanges(branch, 1,
 
210
            expected_added=[(dest_path,)])
 
211
        self.assertContent(branch, revtree2, dest_path, "aaa")
 
212
        self.assertEqual(revtree1.get_revision_id(),
 
213
                         revtree1.inventory.root.children['a'].revision)
 
214
        self.assertEqual(revtree2.get_revision_id(),
 
215
                         revtree2.inventory.root.children['b'].revision)
 
216
 
 
217
    def test_copy_file_in_subdir(self):
 
218
        handler, branch = self.get_handler()
 
219
        src_path = 'a/a'
 
220
        dest_path = 'a/b'
 
221
        handler.process(self.file_command_iter(src_path, dest_path))
 
222
        revtree1, revtree2 = self.assertChanges(branch, 1,
 
223
            expected_added=[(dest_path,)])
 
224
        self.assertContent(branch, revtree2, dest_path, "aaa")
 
225
 
 
226
    def test_copy_file_to_new_dir(self):
 
227
        handler, branch = self.get_handler()
 
228
        src_path = 'a/a'
 
229
        dest_path = 'b/a'
 
230
        handler.process(self.file_command_iter(src_path, dest_path))
 
231
        revtree1, revtree2 = self.assertChanges(branch, 1,
 
232
            expected_added=[('b',), (dest_path,)])
 
233
        self.assertContent(branch, revtree2, dest_path, "aaa")
 
234
 
 
235
 
 
236
class TestFileKinds(TestCaseForGenericProcessor):
 
237
 
 
238
    def get_command_iter(self, path, kind, content):
 
239
        def command_list():
 
240
            committer = ['', 'elmer@a.com', time.time(), time.timezone]
 
241
            def files_one():
 
242
                yield commands.FileModifyCommand(path, kind, False,
 
243
                        None, content)
 
244
            yield commands.CommitCommand('head', '1', None,
 
245
                committer, "commit 1", None, [], files_one)
 
246
        return command_list
 
247
 
 
248
    def test_import_plainfile(self):
 
249
        handler, branch = self.get_handler()
 
250
        handler.process(self.get_command_iter('foo', 'file', 'aaa'))
 
251
 
 
252
    def test_import_symlink(self):
 
253
        handler, branch = self.get_handler()
 
254
        handler.process(self.get_command_iter('foo', 'symlink', 'bar'))