1
# Copyright (C) 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
from cStringIO import StringIO
28
"""Split a command line respecting quotes."""
29
scanner = shlex.shlex(s)
30
scanner.quotes = '\'"`'
31
scanner.whitespace_split = True
32
for t in list(scanner):
33
# Strip the simple and double quotes since we don't care about them.
34
# We leave the backquotes in place though since they have a different
36
if t[0] in ('"', "'") and t[0] == t[-1]:
42
def _script_to_commands(text, file_name=None):
43
"""Turn a script into a list of commands with their associated IOs.
45
Each command appears on a line by itself. It can be associated with an
46
input that will feed it and an expected output.
47
Comments starts with '#' until the end of line.
48
Empty lines are ignored.
49
Input and output are full lines terminated by a '\n'.
50
Input lines start with '<'.
51
Output lines start with '>'.
52
Error lines start with '2>'.
57
def add_command(cmd, input, output, error):
60
input = ''.join(input)
61
if output is not None:
62
output = ''.join(output)
64
error = ''.join(error)
65
commands.append((cmd, input, output, error))
70
input, output, error = None, None, None
71
for line in text.split('\n'):
73
# Keep a copy for error reporting
75
comment = line.find('#')
78
line = line[0:comment]
83
if line.startswith('<'):
86
raise SyntaxError('No command for that input',
87
(file_name, lineno, 1, orig))
89
input.append(line[1:] + '\n')
91
elif line.startswith('>'):
94
raise SyntaxError('No command for that output',
95
(file_name, lineno, 1, orig))
97
output.append(line[1:] + '\n')
99
elif line.startswith('2>'):
102
raise SyntaxError('No command for that error',
103
(file_name, lineno, 1, orig))
105
error.append(line[2:] + '\n')
108
# Time to output the current command
109
add_command(cmd_cur, input, output, error)
110
# And start a new one
111
cmd_cur = list(split(line))
113
input, output, error = None, None, None
114
# Add the last seen command
115
add_command(cmd_cur, input, output, error)
119
def _scan_redirection_options(args):
120
"""Recognize and process input and output redirections.
122
:param args: The command line arguments
124
:return: A tuple containing:
125
- The file name redirected from or None
126
- The file name redirected to or None
127
- The mode to open the output file or None
128
- The reamining arguments
132
out_name, out_mode = None, None
134
if arg.startswith('<'):
136
elif arg.startswith('>>'):
139
elif arg.startswith('>'):
143
remaining.append(arg)
144
return in_name, out_name, out_mode, remaining
147
class TestCaseWithScript(tests.TestCaseWithTransport):
150
super(TestCaseWithScript, self).setUp()
153
def run_script(self, text):
154
for cmd, input, output, error in _script_to_commands(text):
155
self.run_command(cmd, input, output, error)
157
def _check_output(self, expected, actual):
159
# Specifying None means: any output is accepted
161
self.assertEquals(expected, actual)
163
def run_command(self, cmd, input, output, error):
164
mname = 'do_' + cmd[0]
165
method = getattr(self, mname, None)
167
raise SyntaxError('Command not found "%s"' % (cmd[0],),
168
None, 1, ' '.join(cmd))
172
str_input = ''.join(input)
173
actual_output, actual_error = method(str_input, cmd[1:])
175
self._check_output(output, actual_output)
176
self._check_output(error, actual_error)
177
return actual_output, actual_error
179
def _read_input(self, input, in_name):
180
if in_name is not None:
181
infile = open(in_name, 'rb')
183
# Command redirection takes precedence over provided input
184
input = infile.read()
189
def _write_output(self, output, out_name, out_mode):
190
if out_name is not None:
191
outfile = open(out_name, out_mode)
193
outfile.write(output)
199
def do_bzr(self, input, args):
200
out, err = self._run_bzr_core(args, retcode=None, encoding=None,
201
stdin=input, working_dir=None)
204
def do_cat(self, input, args):
205
(in_name, out_name, out_mode, args) = _scan_redirection_options(args)
207
raise SyntaxError('Usage: cat [file1]')
209
if in_name is not None:
210
raise SyntaxError('Specify a file OR use redirection')
212
input = self._read_input(input, in_name)
213
# Basically cat copy input to output
215
# Handle output redirections
216
output = self._write_output(output, out_name, out_mode)
219
def do_echo(self, input, args):
220
(in_name, out_name, out_mode, args) = _scan_redirection_options(args)
222
raise SyntaxError('Specify parameters OR use redirection')
224
input = ''.join(args)
225
input = self._read_input(input, in_name)
226
# Always append a \n'
230
# Handle output redirections
231
output = self._write_output(output, out_name, out_mode)
234
def _ensure_in_jail(self, path):
235
if not osutils.is_inside(self.test_dir, osutils.normalizepath(path)):
236
raise ValueError('%s is not inside %s' % (path, self.test_dir))
238
def do_cd(self, input, args):
240
raise SyntaxError('Usage: cd [dir]')
243
self._ensure_in_jail(d)
249
def do_mkdir(self, input, args):
250
if not args or len(args) != 1:
251
raise SyntaxError('Usage: mkdir dir')
253
self._ensure_in_jail(d)