/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to tests/__init__.py

Add initial test for push.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""The basic test suite for bzr-git."""
 
18
 
 
19
import subprocess
 
20
import time
 
21
 
 
22
from bzrlib import (
 
23
    osutils,
 
24
    tests,
 
25
    trace,
 
26
    )
 
27
from bzrlib.plugins.git import (
 
28
    errors,
 
29
    )
 
30
 
 
31
TestCase = tests.TestCase
 
32
TestCaseInTempDir = tests.TestCaseInTempDir
 
33
TestCaseWithTransport = tests.TestCaseWithTransport
 
34
TestCaseWithMemoryTransport = tests.TestCaseWithMemoryTransport
 
35
 
 
36
class _GitCommandFeature(tests.Feature):
 
37
 
 
38
    def _probe(self):
 
39
        try:
 
40
            p = subprocess.Popen(['git', '--version'], stdout=subprocess.PIPE)
 
41
        except IOError:
 
42
            return False
 
43
        out, err = p.communicate()
 
44
        trace.mutter('Using: %s', out.rstrip('\n'))
 
45
        return True
 
46
 
 
47
    def feature_name(self):
 
48
        return 'git'
 
49
 
 
50
GitCommandFeature = _GitCommandFeature()
 
51
 
 
52
 
 
53
def run_git(*args):
 
54
    cmd = ['git'] + list(args)
 
55
    p = subprocess.Popen(cmd,
 
56
                         stdout=subprocess.PIPE,
 
57
                         stderr=subprocess.PIPE)
 
58
    out, err = p.communicate()
 
59
    if p.returncode != 0:
 
60
        raise AssertionError('Bad return code: %d for %s:\n%s'
 
61
                             % (p.returncode, ' '.join(cmd), err))
 
62
    return out
 
63
 
 
64
 
 
65
class GitBranchBuilder(object):
 
66
 
 
67
    def __init__(self, stream=None):
 
68
        self.commit_info = []
 
69
        self.stream = stream
 
70
        self._process = None
 
71
        self._counter = 0
 
72
        self._branch = 'refs/heads/master'
 
73
        if stream is None:
 
74
            # Write the marks file into the git sandbox.
 
75
            self._marks_file_name = osutils.abspath('marks')
 
76
            self._process = subprocess.Popen(
 
77
                ['git', 'fast-import', '--quiet',
 
78
                 # GIT doesn't support '--export-marks foo'
 
79
                 # it only supports '--export-marks=foo'
 
80
                 # And gives a 'unknown option' otherwise.
 
81
                 '--export-marks=' + self._marks_file_name,
 
82
                ],
 
83
                stdout=subprocess.PIPE,
 
84
                stderr=subprocess.PIPE,
 
85
                stdin=subprocess.PIPE,
 
86
                )
 
87
            self.stream = self._process.stdin
 
88
        else:
 
89
            self._process = None
 
90
 
 
91
    def set_branch(self, branch):
 
92
        """Set the branch we are committing."""
 
93
        self._branch = branch
 
94
 
 
95
    def _write(self, text):
 
96
        try:
 
97
            self.stream.write(text)
 
98
        except IOError, e:
 
99
            if self._process is None:
 
100
                raise
 
101
            raise errors.GitCommandError(self._process.returncode,
 
102
                                         'git fast-import',
 
103
                                         self._process.stderr.read())
 
104
 
 
105
    def _writelines(self, lines):
 
106
        try:
 
107
            self.stream.writelines(lines)
 
108
        except IOError, e:
 
109
            if self._process is None:
 
110
                raise
 
111
            raise errors.GitCommandError(self._process.returncode,
 
112
                                         'git fast-import',
 
113
                                         self._process.stderr.read())
 
114
 
 
115
    def _create_blob(self, content):
 
116
        self._counter += 1
 
117
        self._write('blob\n')
 
118
        self._write('mark :%d\n' % (self._counter,))
 
119
        self._write('data %d\n' % (len(content),))
 
120
        self._write(content)
 
121
        self._write('\n')
 
122
        return self._counter
 
123
 
 
124
    def set_symlink(self, path, content):
 
125
        """Create or update symlink at a given path."""
 
126
        mark = self._create_blob(content)
 
127
        mode = '120000'
 
128
        self.commit_info.append('M %s :%d %s\n'
 
129
                % (mode, mark, self._encode_path(path)))
 
130
 
 
131
    def set_file(self, path, content, executable):
 
132
        """Create or update content at a given path."""
 
133
        mark = self._create_blob(content)
 
134
        if executable:
 
135
            mode = '100755'
 
136
        else:
 
137
            mode = '100644'
 
138
        self.commit_info.append('M %s :%d %s\n'
 
139
                                % (mode, mark, self._encode_path(path)))
 
140
 
 
141
    def set_link(self, path, link_target):
 
142
        """Create or update a link at a given path."""
 
143
        mark = self._create_blob(link_target)
 
144
        self.commit_info.append('M 120000 :%d %s\n'
 
145
                                % (mark, self._encode_path(path)))
 
146
 
 
147
    def delete_entry(self, path):
 
148
        """This will delete files or symlinks at the given location."""
 
149
        self.commit_info.append('D %s\n' % (self._encode_path(path),))
 
150
 
 
151
    @staticmethod
 
152
    def _encode_path(path):
 
153
        if '\n' in path or path[0] == '"':
 
154
            path = path.replace('\\', '\\\\')
 
155
            path = path.replace('\n', '\\n')
 
156
            path = path.replace('"', '\\"')
 
157
            path = '"' + path + '"'
 
158
        return path.encode('utf-8')
 
159
 
 
160
    # TODO: Author
 
161
    # TODO: Author timestamp+timezone
 
162
    def commit(self, committer, message, timestamp=None,
 
163
               timezone='+0000', author=None,
 
164
               merge=None, base=None):
 
165
        """Commit the new content.
 
166
 
 
167
        :param committer: The name and address for the committer
 
168
        :param message: The commit message
 
169
        :param timestamp: The timestamp for the commit
 
170
        :param timezone: The timezone of the commit, such as '+0000' or '-1000'
 
171
        :param author: The name and address of the author (if different from
 
172
            committer)
 
173
        :param merge: A list of marks if this should merge in another commit
 
174
        :param base: An id for the base revision (primary parent) if that
 
175
            is not the last commit.
 
176
        :return: A mark which can be used in the future to reference this
 
177
            commit.
 
178
        """
 
179
        self._counter += 1
 
180
        mark = self._counter
 
181
        if timestamp is None:
 
182
            timestamp = int(time.time())
 
183
        self._write('commit %s\n' % (self._branch,))
 
184
        self._write('mark :%d\n' % (mark,))
 
185
        self._write('committer %s %s %s\n'
 
186
                    % (committer, timestamp, timezone))
 
187
        message = message.encode('UTF-8')
 
188
        self._write('data %d\n' % (len(message),))
 
189
        self._write(message)
 
190
        self._write('\n')
 
191
        if base is not None:
 
192
            self._write('from :%d\n' % (base,))
 
193
        if merge is not None:
 
194
            for m in merge:
 
195
                self._write('merge :%d\n' % (m,))
 
196
        self._writelines(self.commit_info)
 
197
        self._write('\n')
 
198
        self.commit_info = []
 
199
        return mark
 
200
 
 
201
    def reset(self, ref=None, mark=None):
 
202
        """Create or recreate the named branch.
 
203
 
 
204
        :param ref: branch name, defaults to the current branch.
 
205
        :param mark: commit the branch will point to.
 
206
        """
 
207
        if ref is None:
 
208
            ref = self._branch
 
209
        self._write('reset %s\n' % (ref,))
 
210
        if mark is not None:
 
211
            self._write('from :%d\n' % mark)
 
212
        self._write('\n')
 
213
 
 
214
    def finish(self):
 
215
        """We are finished building, close the stream, get the id mapping"""
 
216
        self.stream.close()
 
217
        if self._process is None:
 
218
            return {}
 
219
        if self._process.wait() != 0:
 
220
            raise errors.GitCommandError(self._process.returncode,
 
221
                                         'git fast-import',
 
222
                                         self._process.stderr.read())
 
223
        marks_file = open(self._marks_file_name)
 
224
        mapping = {}
 
225
        for line in marks_file:
 
226
            mark, shasum = line.split()
 
227
            assert mark.startswith(':')
 
228
            mapping[int(mark[1:])] = shasum
 
229
        marks_file.close()
 
230
        return mapping
 
231
 
 
232
 
 
233
def test_suite():
 
234
    loader = tests.TestLoader()
 
235
 
 
236
    suite = tests.TestSuite()
 
237
 
 
238
    testmod_names = [
 
239
        'test_blackbox',
 
240
        'test_builder',
 
241
        'test_branch',
 
242
        'test_cache',
 
243
        'test_dir',
 
244
        'test_fetch',
 
245
        'test_mapping',
 
246
        'test_object_store',
 
247
        'test_push',
 
248
        'test_remote',
 
249
        'test_repository',
 
250
        'test_refs',
 
251
        'test_revspec',
 
252
        'test_roundtrip',
 
253
        'test_transportgit',
 
254
        ]
 
255
    testmod_names = ['%s.%s' % (__name__, t) for t in testmod_names]
 
256
    suite.addTests(loader.loadTestsFromModuleNames(testmod_names))
 
257
 
 
258
    return suite