/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/script.py

Fixed as per Martin's review.

* doc/developers/testing.txt: 
Add a shelli-like tests section.

* bzrlib/tests/test_script.py:
Update scripts for new prefixes and fix echo tests.

* bzrlib/tests/script.py: Move doc to doc/developers/testing.txt.
(_script_to_commands): Commands are prefixed by '$', expected
output are not prefixed anymore.
(ScriptRunner.do_echo): Put spaces between arguments.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developpers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import os
 
26
import shlex
 
27
from cStringIO import StringIO
 
28
 
 
29
from bzrlib import (
 
30
    osutils,
 
31
    tests,
 
32
    )
 
33
 
 
34
 
 
35
def split(s):
 
36
    """Split a command line respecting quotes."""
 
37
    scanner = shlex.shlex(s)
 
38
    scanner.quotes = '\'"`'
 
39
    scanner.whitespace_split = True
 
40
    for t in list(scanner):
 
41
        yield t
 
42
 
 
43
 
 
44
def _script_to_commands(text, file_name=None):
 
45
    """Turn a script into a list of commands with their associated IOs.
 
46
 
 
47
    Each command appears on a line by itself. It can be associated with an
 
48
    input that will feed it and an expected output.
 
49
    Comments starts with '#' until the end of line.
 
50
    Empty lines are ignored.
 
51
    Input and output are full lines terminated by a '\n'.
 
52
    Input lines start with '<'.
 
53
    Output lines start with '>'.
 
54
    Error lines start with '2>'.
 
55
    """
 
56
 
 
57
    commands = []
 
58
 
 
59
    def add_command(cmd, input, output, error):
 
60
        if cmd is not None:
 
61
            if input is not None:
 
62
                input = ''.join(input)
 
63
            if output is not None:
 
64
                output = ''.join(output)
 
65
            if error is not None:
 
66
                error = ''.join(error)
 
67
            commands.append((cmd, input, output, error))
 
68
 
 
69
    cmd_cur = None
 
70
    cmd_line = 1
 
71
    lineno = 0
 
72
    input, output, error = None, None, None
 
73
    for line in text.split('\n'):
 
74
        lineno += 1
 
75
        # Keep a copy for error reporting
 
76
        orig = line
 
77
        comment =  line.find('#')
 
78
        if comment >= 0:
 
79
            # Delete comments
 
80
            line = line[0:comment]
 
81
            line = line.rstrip()
 
82
        if line == '':
 
83
            # Ignore empty lines
 
84
            continue
 
85
        if line.startswith('$'):
 
86
            # Time to output the current command
 
87
            add_command(cmd_cur, input, output, error)
 
88
            # And start a new one
 
89
            cmd_cur = list(split(line[1:]))
 
90
            cmd_line = lineno
 
91
            input, output, error = None, None, None
 
92
        elif line.startswith('<'):
 
93
            if input is None:
 
94
                if cmd_cur is None:
 
95
                    raise SyntaxError('No command for that input',
 
96
                                      (file_name, lineno, 1, orig))
 
97
                input = []
 
98
            input.append(line[1:] + '\n')
 
99
        elif line.startswith('2>'):
 
100
            if error is None:
 
101
                if cmd_cur is None:
 
102
                    raise SyntaxError('No command for that error',
 
103
                                      (file_name, lineno, 1, orig))
 
104
                error = []
 
105
            error.append(line[2:] + '\n')
 
106
        else:
 
107
            if output is None:
 
108
                if cmd_cur is None:
 
109
                    raise SyntaxError('No command for that output',
 
110
                                      (file_name, lineno, 1, orig))
 
111
                output = []
 
112
            output.append(line + '\n')
 
113
    # Add the last seen command
 
114
    add_command(cmd_cur, input, output, error)
 
115
    return commands
 
116
 
 
117
 
 
118
def _scan_redirection_options(args):
 
119
    """Recognize and process input and output redirections.
 
120
 
 
121
    :param args: The command line arguments
 
122
 
 
123
    :return: A tuple containing: 
 
124
        - The file name redirected from or None
 
125
        - The file name redirected to or None
 
126
        - The mode to open the output file or None
 
127
        - The reamining arguments
 
128
    """
 
129
    def redirected_file_name(direction, name, args):
 
130
        if name == '':
 
131
            try:
 
132
                name = args.pop(0)
 
133
            except IndexError:
 
134
                # We leave the error handling to higher levels, an empty name
 
135
                # can't be legal.
 
136
                name = ''
 
137
        return name
 
138
 
 
139
    remaining = []
 
140
    in_name = None
 
141
    out_name, out_mode = None, None
 
142
    while args:
 
143
        arg = args.pop(0)
 
144
        if arg.startswith('<'):
 
145
            in_name = redirected_file_name('<', arg[1:], args)
 
146
        elif arg.startswith('>>'):
 
147
            out_name = redirected_file_name('>>', arg[2:], args)
 
148
            out_mode = 'ab+'
 
149
        elif arg.startswith('>',):
 
150
            out_name = redirected_file_name('>', arg[1:], args)
 
151
            out_mode = 'wb+'
 
152
        else:
 
153
            remaining.append(arg)
 
154
    return in_name, out_name, out_mode, remaining
 
155
 
 
156
 
 
157
class ScriptRunner(object):
 
158
 
 
159
    def __init__(self, test_case):
 
160
        self.test_case = test_case
 
161
        self.output_checker = doctest.OutputChecker()
 
162
        self.check_options = doctest.ELLIPSIS
 
163
 
 
164
    def run_script(self, text):
 
165
        for cmd, input, output, error in _script_to_commands(text):
 
166
            self.run_command(cmd, input, output, error)
 
167
 
 
168
    def _check_output(self, expected, actual):
 
169
        if expected is None:
 
170
            # Specifying None means: any output is accepted
 
171
            return
 
172
        if actual is None:
 
173
            self.test_case.fail('Unexpected: %s' % actual)
 
174
        matching = self.output_checker.check_output(
 
175
            expected, actual, self.check_options)
 
176
        if not matching:
 
177
            # Note that we can't use output_checker.output_difference() here
 
178
            # because... the API is broken ('expected' must be a doctest
 
179
            # specific object of which a 'want' attribute will be our
 
180
            # 'expected' parameter. So we just fallback to our good old
 
181
            # assertEqualDiff since we know there *are* differences and the
 
182
            # output should be decently readable.
 
183
            self.test_case.assertEqualDiff(expected, actual)
 
184
 
 
185
    def _pre_process_args(self, args):
 
186
        new_args = []
 
187
        for arg in args:
 
188
            # Strip the simple and double quotes since we don't care about
 
189
            # them.  We leave the backquotes in place though since they have a
 
190
            # different semantic.
 
191
            if arg[0] in  ('"', "'") and arg[0] == arg[-1]:
 
192
                yield arg[1:-1]
 
193
            else:
 
194
                if glob.has_magic(arg):
 
195
                    matches = glob.glob(arg)
 
196
                    if matches:
 
197
                        # We care more about order stability than performance
 
198
                        # here
 
199
                        matches.sort()
 
200
                        for m in matches:
 
201
                            yield m
 
202
                else:
 
203
                    yield arg
 
204
 
 
205
    def run_command(self, cmd, input, output, error):
 
206
        mname = 'do_' + cmd[0]
 
207
        method = getattr(self, mname, None)
 
208
        if method is None:
 
209
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
210
                              None, 1, ' '.join(cmd))
 
211
        if input is None:
 
212
            str_input = ''
 
213
        else:
 
214
            str_input = ''.join(input)
 
215
        args = list(self._pre_process_args(cmd[1:]))
 
216
        retcode, actual_output, actual_error = method(str_input, args)
 
217
 
 
218
        self._check_output(output, actual_output)
 
219
        self._check_output(error, actual_error)
 
220
        if retcode and not error and actual_error:
 
221
            self.test_case.fail('In \n\t%s\nUnexpected error: %s'
 
222
                                % (' '.join(cmd), actual_error))
 
223
        return retcode, actual_output, actual_error
 
224
 
 
225
    def _read_input(self, input, in_name):
 
226
        if in_name is not None:
 
227
            infile = open(in_name, 'rb')
 
228
            try:
 
229
                # Command redirection takes precedence over provided input
 
230
                input = infile.read()
 
231
            finally:
 
232
                infile.close()
 
233
        return input
 
234
 
 
235
    def _write_output(self, output, out_name, out_mode):
 
236
        if out_name is not None:
 
237
            outfile = open(out_name, out_mode)
 
238
            try:
 
239
                outfile.write(output)
 
240
            finally:
 
241
                outfile.close()
 
242
            output = None
 
243
        return output
 
244
 
 
245
    def do_bzr(self, input, args):
 
246
        retcode, out, err = self.test_case._run_bzr_core(
 
247
            args, retcode=None, encoding=None, stdin=input, working_dir=None)
 
248
        return retcode, out, err
 
249
 
 
250
    def do_cat(self, input, args):
 
251
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
252
        if args and in_name is not None:
 
253
            raise SyntaxError('Specify a file OR use redirection')
 
254
 
 
255
        inputs = []
 
256
        if input:
 
257
            inputs.append(input)
 
258
        input_names = args
 
259
        if in_name:
 
260
            args.append(in_name)
 
261
        for in_name in input_names:
 
262
            try:
 
263
                inputs.append(self._read_input(None, in_name))
 
264
            except IOError, e:
 
265
                if e.errno == errno.ENOENT:
 
266
                    return (1, None,
 
267
                            '%s: No such file or directory\n' % (in_name,))
 
268
        # Basically cat copy input to output
 
269
        output = ''.join(inputs)
 
270
        # Handle output redirections
 
271
        try:
 
272
            output = self._write_output(output, out_name, out_mode)
 
273
        except IOError, e:
 
274
            if e.errno == errno.ENOENT:
 
275
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
276
        return 0, output, None
 
277
 
 
278
    def do_echo(self, input, args):
 
279
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
280
        if input and args:
 
281
                raise SyntaxError('Specify parameters OR use redirection')
 
282
        if args:
 
283
            input = ' '.join(args)
 
284
        try:
 
285
            input = self._read_input(input, in_name)
 
286
        except IOError, e:
 
287
            if e.errno == errno.ENOENT:
 
288
                return 1, None, '%s: No such file or directory\n' % (in_name,)
 
289
        # Always append a \n'
 
290
        input += '\n'
 
291
        # Process output
 
292
        output = input
 
293
        # Handle output redirections
 
294
        try:
 
295
            output = self._write_output(output, out_name, out_mode)
 
296
        except IOError, e:
 
297
            if e.errno == errno.ENOENT:
 
298
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
299
        return 0, output, None
 
300
 
 
301
    def _ensure_in_jail(self, path):
 
302
        jail_root = self.test_case.get_jail_root()
 
303
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
304
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
305
 
 
306
    def do_cd(self, input, args):
 
307
        if len(args) > 1:
 
308
            raise SyntaxError('Usage: cd [dir]')
 
309
        if len(args) == 1:
 
310
            d = args[0]
 
311
            self._ensure_in_jail(d)
 
312
        else:
 
313
            d = self.test_case.get_jail_root()
 
314
        os.chdir(d)
 
315
        return 0, None, None
 
316
 
 
317
    def do_mkdir(self, input, args):
 
318
        if not args or len(args) != 1:
 
319
            raise SyntaxError('Usage: mkdir dir')
 
320
        d = args[0]
 
321
        self._ensure_in_jail(d)
 
322
        os.mkdir(d)
 
323
        return 0, None, None
 
324
 
 
325
    def do_rm(self, input, args):
 
326
        err = None
 
327
 
 
328
        def error(msg, path):
 
329
            return  "rm: cannot remove '%s': %s\n" % (path, msg)
 
330
 
 
331
        force, recursive = False, False
 
332
        opts = None
 
333
        if args and args[0][0] == '-':
 
334
            opts = args.pop(0)[1:]
 
335
            if 'f' in opts:
 
336
                force = True
 
337
                opts = opts.replace('f', '', 1)
 
338
            if 'r' in opts:
 
339
                recursive = True
 
340
                opts = opts.replace('r', '', 1)
 
341
        if not args or opts:
 
342
            raise SyntaxError('Usage: rm [-fr] path+')
 
343
        for p in args:
 
344
            self._ensure_in_jail(p)
 
345
            # FIXME: Should we put that in osutils ?
 
346
            try:
 
347
                os.remove(p)
 
348
            except OSError, e:
 
349
                if e.errno == errno.EISDIR:
 
350
                    if recursive:
 
351
                        osutils.rmtree(p)
 
352
                    else:
 
353
                        err = error('Is a directory', p)
 
354
                        break
 
355
                elif e.errno == errno.ENOENT:
 
356
                    if not force:
 
357
                        err =  error('No such file or directory', p)
 
358
                        break
 
359
                else:
 
360
                    raise
 
361
        if err:
 
362
            retcode = 1
 
363
        else:
 
364
            retcode = 0
 
365
        return retcode, None, err
 
366
 
 
367
 
 
368
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
369
 
 
370
    def setUp(self):
 
371
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
372
        self.script_runner = ScriptRunner(self)
 
373
        # Break the circular dependency
 
374
        def break_dependency():
 
375
            self.script_runner = None
 
376
        self.addCleanup(break_dependency)
 
377
 
 
378
    def get_jail_root(self):
 
379
        raise NotImplementedError(self.get_jail_root)
 
380
 
 
381
    def run_script(self, script):
 
382
        return self.script_runner.run_script(script)
 
383
 
 
384
    def run_command(self, cmd, input, output, error):
 
385
        return self.script_runner.run_command(cmd, input, output, error)
 
386
 
 
387
 
 
388
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
389
 
 
390
    def setUp(self):
 
391
        super(TestCaseWithTransportAndScript, self).setUp()
 
392
        self.script_runner = ScriptRunner(self)
 
393
        # Break the circular dependency
 
394
        def break_dependency():
 
395
            self.script_runner = None
 
396
        self.addCleanup(break_dependency)
 
397
 
 
398
    def get_jail_root(self):
 
399
        return self.test_dir
 
400
 
 
401
    def run_script(self, script):
 
402
        return self.script_runner.run_script(script)
 
403
 
 
404
    def run_command(self, cmd, input, output, error):
 
405
        return self.script_runner.run_command(cmd, input, output, error)