/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/script.py

  • Committer: John Arbash Meinel
  • Date: 2006-04-25 15:05:42 UTC
  • mfrom: (1185.85.85 bzr-encoding)
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060425150542-c7b518dca9928691
[merge] the old bzr-encoding changes, reparenting them on bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2009, 2010, 2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Shell-like test scripts.
18
 
 
19
 
See developers/testing.html for more explanations.
20
 
"""
21
 
 
22
 
import doctest
23
 
import errno
24
 
import glob
25
 
import logging
26
 
import os
27
 
import shlex
28
 
import sys
29
 
import textwrap
30
 
 
31
 
from .. import (
32
 
    osutils,
33
 
    tests,
34
 
    trace,
35
 
    )
36
 
from ..tests import ui_testing
37
 
 
38
 
 
39
 
def split(s):
40
 
    """Split a command line respecting quotes."""
41
 
    scanner = shlex.shlex(s)
42
 
    scanner.quotes = '\'"`'
43
 
    scanner.whitespace_split = True
44
 
    for t in list(scanner):
45
 
        yield t
46
 
 
47
 
 
48
 
def _script_to_commands(text, file_name=None):
49
 
    """Turn a script into a list of commands with their associated IOs.
50
 
 
51
 
    Each command appears on a line by itself starting with '$ '. It can be
52
 
    associated with an input that will feed it and an expected output.
53
 
 
54
 
    Comments starts with '#' until the end of line.
55
 
    Empty lines are ignored.
56
 
 
57
 
    Input and output are full lines terminated by a '\n'.
58
 
 
59
 
    Input lines start with '<'.
60
 
    Output lines start with nothing.
61
 
    Error lines start with '2>'.
62
 
 
63
 
    :return: A sequence of ([args], input, output, errors), where the args are
64
 
        split in to words, and the input, output, and errors are just strings,
65
 
        typically containing newlines.
66
 
    """
67
 
 
68
 
    commands = []
69
 
 
70
 
    def add_command(cmd, input, output, error):
71
 
        if cmd is not None:
72
 
            if input is not None:
73
 
                input = ''.join(input)
74
 
            if output is not None:
75
 
                output = ''.join(output)
76
 
            if error is not None:
77
 
                error = ''.join(error)
78
 
            commands.append((cmd, input, output, error))
79
 
 
80
 
    cmd_cur = None
81
 
    cmd_line = 1
82
 
    lineno = 0
83
 
    input, output, error = None, None, None
84
 
    text = textwrap.dedent(text)
85
 
    lines = text.split('\n')
86
 
    # to make use of triple-quoted strings easier, we ignore a blank line
87
 
    # right at the start and right at the end; the rest are meaningful
88
 
    if lines and lines[0] == '':
89
 
        del lines[0]
90
 
    if lines and lines[-1] == '':
91
 
        del lines[-1]
92
 
    for line in lines:
93
 
        lineno += 1
94
 
        # Keep a copy for error reporting
95
 
        orig = line
96
 
        comment = line.find('#')
97
 
        if comment >= 0:
98
 
            # Delete comments
99
 
            # NB: this syntax means comments are allowed inside output, which
100
 
            # may be confusing...
101
 
            line = line[0:comment]
102
 
            line = line.rstrip()
103
 
            if line == '':
104
 
                continue
105
 
        if line.startswith('$'):
106
 
            # Time to output the current command
107
 
            add_command(cmd_cur, input, output, error)
108
 
            # And start a new one
109
 
            cmd_cur = list(split(line[1:]))
110
 
            cmd_line = lineno
111
 
            input, output, error = None, None, None
112
 
        elif line.startswith('<'):
113
 
            if input is None:
114
 
                if cmd_cur is None:
115
 
                    raise SyntaxError('No command for that input',
116
 
                                      (file_name, lineno, 1, orig))
117
 
                input = []
118
 
            input.append(line[1:] + '\n')
119
 
        elif line.startswith('2>'):
120
 
            if error is None:
121
 
                if cmd_cur is None:
122
 
                    raise SyntaxError('No command for that error',
123
 
                                      (file_name, lineno, 1, orig))
124
 
                error = []
125
 
            error.append(line[2:] + '\n')
126
 
        else:
127
 
            # can happen if the first line is not recognized as a command, eg
128
 
            # if the prompt has leading whitespace
129
 
            if output is None:
130
 
                if cmd_cur is None:
131
 
                    raise SyntaxError('No command for line %r' % (line,),
132
 
                                      (file_name, lineno, 1, orig))
133
 
                output = []
134
 
            output.append(line + '\n')
135
 
    # Add the last seen command
136
 
    add_command(cmd_cur, input, output, error)
137
 
    return commands
138
 
 
139
 
 
140
 
def _scan_redirection_options(args):
141
 
    """Recognize and process input and output redirections.
142
 
 
143
 
    :param args: The command line arguments
144
 
 
145
 
    :return: A tuple containing:
146
 
        - The file name redirected from or None
147
 
        - The file name redirected to or None
148
 
        - The mode to open the output file or None
149
 
        - The reamining arguments
150
 
    """
151
 
    def redirected_file_name(direction, name, args):
152
 
        if name == '':
153
 
            try:
154
 
                name = args.pop(0)
155
 
            except IndexError:
156
 
                # We leave the error handling to higher levels, an empty name
157
 
                # can't be legal.
158
 
                name = ''
159
 
        return name
160
 
 
161
 
    remaining = []
162
 
    in_name = None
163
 
    out_name, out_mode = None, None
164
 
    while args:
165
 
        arg = args.pop(0)
166
 
        if arg.startswith('<'):
167
 
            in_name = redirected_file_name('<', arg[1:], args)
168
 
        elif arg.startswith('>>'):
169
 
            out_name = redirected_file_name('>>', arg[2:], args)
170
 
            out_mode = 'a+'
171
 
        elif arg.startswith('>',):
172
 
            out_name = redirected_file_name('>', arg[1:], args)
173
 
            out_mode = 'w+'
174
 
        else:
175
 
            remaining.append(arg)
176
 
    return in_name, out_name, out_mode, remaining
177
 
 
178
 
 
179
 
class ScriptRunner(object):
180
 
    """Run a shell-like script from a test.
181
 
 
182
 
    Can be used as:
183
 
 
184
 
    from breezy.tests import script
185
 
 
186
 
    ...
187
 
 
188
 
        def test_bug_nnnnn(self):
189
 
            sr = script.ScriptRunner()
190
 
            sr.run_script(self, '''
191
 
            $ brz init
192
 
            $ brz do-this
193
 
            # Boom, error
194
 
            ''')
195
 
    """
196
 
 
197
 
    def __init__(self):
198
 
        self.output_checker = doctest.OutputChecker()
199
 
        self.check_options = doctest.ELLIPSIS
200
 
 
201
 
    def run_script(self, test_case, text, null_output_matches_anything=False):
202
 
        """Run a shell-like script as a test.
203
 
 
204
 
        :param test_case: A TestCase instance that should provide the fail(),
205
 
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
206
 
            attribute used as a jail root.
207
 
 
208
 
        :param text: A shell-like script (see _script_to_commands for syntax).
209
 
 
210
 
        :param null_output_matches_anything: For commands with no specified
211
 
            output, ignore any output that does happen, including output on
212
 
            standard error.
213
 
        """
214
 
        self.null_output_matches_anything = null_output_matches_anything
215
 
        for cmd, input, output, error in _script_to_commands(text):
216
 
            self.run_command(test_case, cmd, input, output, error)
217
 
 
218
 
    def run_command(self, test_case, cmd, input, output, error):
219
 
        mname = 'do_' + cmd[0]
220
 
        method = getattr(self, mname, None)
221
 
        if method is None:
222
 
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
223
 
                              (None, 1, 1, ' '.join(cmd)))
224
 
        if input is None:
225
 
            str_input = ''
226
 
        else:
227
 
            str_input = ''.join(input)
228
 
        args = list(self._pre_process_args(cmd[1:]))
229
 
        retcode, actual_output, actual_error = method(test_case,
230
 
                                                      str_input, args)
231
 
 
232
 
        try:
233
 
            self._check_output(output, actual_output, test_case)
234
 
        except AssertionError as e:
235
 
            raise AssertionError(str(e) + " in stdout of command %s" % cmd)
236
 
        try:
237
 
            self._check_output(error, actual_error, test_case)
238
 
        except AssertionError as e:
239
 
            raise AssertionError(str(e)
240
 
                                 + " in stderr of running command %s" % cmd)
241
 
        if retcode and not error and actual_error:
242
 
            test_case.fail('In \n\t%s\nUnexpected error: %s'
243
 
                           % (' '.join(cmd), actual_error))
244
 
        return retcode, actual_output, actual_error
245
 
 
246
 
    def _check_output(self, expected, actual, test_case):
247
 
        if not actual:
248
 
            if expected is None:
249
 
                return
250
 
            elif expected == '...\n':
251
 
                return
252
 
            else:
253
 
                test_case.fail('expected output: %r, but found nothing'
254
 
                               % (expected,))
255
 
 
256
 
        null_output_matches_anything = getattr(
257
 
            self, 'null_output_matches_anything', False)
258
 
        if null_output_matches_anything and expected is None:
259
 
            return
260
 
 
261
 
        expected = expected or ''
262
 
        matching = self.output_checker.check_output(
263
 
            expected, actual, self.check_options)
264
 
        if not matching:
265
 
            # Note that we can't use output_checker.output_difference() here
266
 
            # because... the API is broken ('expected' must be a doctest
267
 
            # specific object of which a 'want' attribute will be our
268
 
            # 'expected' parameter. So we just fallback to our good old
269
 
            # assertEqualDiff since we know there *are* differences and the
270
 
            # output should be decently readable.
271
 
            #
272
 
            # As a special case, we allow output that's missing a final
273
 
            # newline to match an expected string that does have one, so that
274
 
            # we can match a prompt printed on one line, then input given on
275
 
            # the next line.
276
 
            if expected == actual + '\n':
277
 
                pass
278
 
            else:
279
 
                test_case.assertEqualDiff(expected, actual)
280
 
 
281
 
    def _pre_process_args(self, args):
282
 
        new_args = []
283
 
        for arg in args:
284
 
            # Strip the simple and double quotes since we don't care about
285
 
            # them.  We leave the backquotes in place though since they have a
286
 
            # different semantic.
287
 
            if arg[0] in ('"', "'") and arg[0] == arg[-1]:
288
 
                yield arg[1:-1]
289
 
            else:
290
 
                if glob.has_magic(arg):
291
 
                    matches = glob.glob(arg)
292
 
                    if matches:
293
 
                        # We care more about order stability than performance
294
 
                        # here
295
 
                        matches.sort()
296
 
                        for m in matches:
297
 
                            yield m
298
 
                else:
299
 
                    yield arg
300
 
 
301
 
    def _read_input(self, input, in_name):
302
 
        if in_name is not None:
303
 
            infile = open(in_name, 'r')
304
 
            try:
305
 
                # Command redirection takes precedence over provided input
306
 
                input = infile.read()
307
 
            finally:
308
 
                infile.close()
309
 
        return input
310
 
 
311
 
    def _write_output(self, output, out_name, out_mode):
312
 
        if out_name is not None:
313
 
            outfile = open(out_name, out_mode)
314
 
            try:
315
 
                outfile.write(output)
316
 
            finally:
317
 
                outfile.close()
318
 
            output = None
319
 
        return output
320
 
 
321
 
    def do_brz(self, test_case, input, args):
322
 
        encoding = osutils.get_user_encoding()
323
 
        if sys.version_info[0] == 2:
324
 
            stdout = ui_testing.BytesIOWithEncoding()
325
 
            stderr = ui_testing.BytesIOWithEncoding()
326
 
            stdout.encoding = stderr.encoding = encoding
327
 
 
328
 
            # FIXME: don't call into logging here
329
 
            handler = trace.EncodedStreamHandler(
330
 
                stderr, errors="replace")
331
 
        else:
332
 
            stdout = ui_testing.StringIOWithEncoding()
333
 
            stderr = ui_testing.StringIOWithEncoding()
334
 
            stdout.encoding = stderr.encoding = encoding
335
 
            handler = logging.StreamHandler(stderr)
336
 
        handler.setLevel(logging.INFO)
337
 
 
338
 
        logger = logging.getLogger('')
339
 
        logger.addHandler(handler)
340
 
        try:
341
 
            retcode = test_case._run_bzr_core(
342
 
                args, encoding=encoding, stdin=input, stdout=stdout,
343
 
                stderr=stderr, working_dir=None)
344
 
        finally:
345
 
            logger.removeHandler(handler)
346
 
 
347
 
        return retcode, stdout.getvalue(), stderr.getvalue()
348
 
 
349
 
    def do_cat(self, test_case, input, args):
350
 
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
351
 
        if args and in_name is not None:
352
 
            raise SyntaxError('Specify a file OR use redirection')
353
 
 
354
 
        inputs = []
355
 
        if input:
356
 
            inputs.append(input)
357
 
        input_names = args
358
 
        if in_name:
359
 
            args.append(in_name)
360
 
        for in_name in input_names:
361
 
            try:
362
 
                inputs.append(self._read_input(None, in_name))
363
 
            except IOError as e:
364
 
                # Some filenames are illegal on Windows and generate EINVAL
365
 
                # rather than just saying the filename doesn't exist
366
 
                if e.errno in (errno.ENOENT, errno.EINVAL):
367
 
                    return (1, None,
368
 
                            '%s: No such file or directory\n' % (in_name,))
369
 
                raise
370
 
        # Basically cat copy input to output
371
 
        output = ''.join(inputs)
372
 
        # Handle output redirections
373
 
        try:
374
 
            output = self._write_output(output, out_name, out_mode)
375
 
        except IOError as e:
376
 
            # If out_name cannot be created, we may get 'ENOENT', however if
377
 
            # out_name is something like '', we can get EINVAL
378
 
            if e.errno in (errno.ENOENT, errno.EINVAL):
379
 
                return 1, None, '%s: No such file or directory\n' % (out_name,)
380
 
            raise
381
 
        return 0, output, None
382
 
 
383
 
    def do_echo(self, test_case, input, args):
384
 
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
385
 
        if input or in_name:
386
 
            raise SyntaxError('echo doesn\'t read from stdin')
387
 
        if args:
388
 
            input = ' '.join(args)
389
 
        # Always append a \n'
390
 
        input += '\n'
391
 
        # Process output
392
 
        output = input
393
 
        # Handle output redirections
394
 
        try:
395
 
            output = self._write_output(output, out_name, out_mode)
396
 
        except IOError as e:
397
 
            if e.errno in (errno.ENOENT, errno.EINVAL):
398
 
                return 1, None, '%s: No such file or directory\n' % (out_name,)
399
 
            raise
400
 
        return 0, output, None
401
 
 
402
 
    def _get_jail_root(self, test_case):
403
 
        return test_case.test_dir
404
 
 
405
 
    def _ensure_in_jail(self, test_case, path):
406
 
        jail_root = self._get_jail_root(test_case)
407
 
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
408
 
            raise ValueError('%s is not inside %s' % (path, jail_root))
409
 
 
410
 
    def do_cd(self, test_case, input, args):
411
 
        if len(args) > 1:
412
 
            raise SyntaxError('Usage: cd [dir]')
413
 
        if len(args) == 1:
414
 
            d = args[0]
415
 
            self._ensure_in_jail(test_case, d)
416
 
        else:
417
 
            # The test "home" directory is the root of its jail
418
 
            d = self._get_jail_root(test_case)
419
 
        os.chdir(d)
420
 
        return 0, None, None
421
 
 
422
 
    def do_mkdir(self, test_case, input, args):
423
 
        if not args or len(args) != 1:
424
 
            raise SyntaxError('Usage: mkdir dir')
425
 
        d = args[0]
426
 
        self._ensure_in_jail(test_case, d)
427
 
        os.mkdir(d)
428
 
        return 0, None, None
429
 
 
430
 
    def do_rm(self, test_case, input, args):
431
 
        err = None
432
 
 
433
 
        def error(msg, path):
434
 
            return "rm: cannot remove '%s': %s\n" % (path, msg)
435
 
 
436
 
        force, recursive = False, False
437
 
        opts = None
438
 
        if args and args[0][0] == '-':
439
 
            opts = args.pop(0)[1:]
440
 
            if 'f' in opts:
441
 
                force = True
442
 
                opts = opts.replace('f', '', 1)
443
 
            if 'r' in opts:
444
 
                recursive = True
445
 
                opts = opts.replace('r', '', 1)
446
 
        if not args or opts:
447
 
            raise SyntaxError('Usage: rm [-fr] path+')
448
 
        for p in args:
449
 
            self._ensure_in_jail(test_case, p)
450
 
            # FIXME: Should we put that in osutils ?
451
 
            try:
452
 
                os.remove(p)
453
 
            except OSError as e:
454
 
                # Various OSes raises different exceptions (linux: EISDIR,
455
 
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
456
 
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
457
 
                    if recursive:
458
 
                        osutils.rmtree(p)
459
 
                    else:
460
 
                        err = error('Is a directory', p)
461
 
                        break
462
 
                elif e.errno == errno.ENOENT:
463
 
                    if not force:
464
 
                        err = error('No such file or directory', p)
465
 
                        break
466
 
                else:
467
 
                    raise
468
 
        if err:
469
 
            retcode = 1
470
 
        else:
471
 
            retcode = 0
472
 
        return retcode, None, err
473
 
 
474
 
    def do_mv(self, test_case, input, args):
475
 
        err = None
476
 
 
477
 
        def error(msg, src, dst):
478
 
            return "mv: cannot move %s to %s: %s\n" % (src, dst, msg)
479
 
 
480
 
        if not args or len(args) != 2:
481
 
            raise SyntaxError("Usage: mv path1 path2")
482
 
        src, dst = args
483
 
        try:
484
 
            real_dst = dst
485
 
            if os.path.isdir(dst):
486
 
                real_dst = os.path.join(dst, os.path.basename(src))
487
 
            os.rename(src, real_dst)
488
 
        except OSError as e:
489
 
            if e.errno == errno.ENOENT:
490
 
                err = error('No such file or directory', src, dst)
491
 
            else:
492
 
                raise
493
 
        if err:
494
 
            retcode = 1
495
 
        else:
496
 
            retcode = 0
497
 
        return retcode, None, err
498
 
 
499
 
 
500
 
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
501
 
    """Helper class to experiment shell-like test and memory fs.
502
 
 
503
 
    This not intended to be used outside of experiments in implementing memoy
504
 
    based file systems and evolving bzr so that test can use only memory based
505
 
    resources.
506
 
    """
507
 
 
508
 
    def setUp(self):
509
 
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
510
 
        self.script_runner = ScriptRunner()
511
 
        # FIXME: See shelf_ui.Shelver._char_based. This allow using shelve in
512
 
        # scripts while providing a line-based input (better solution in
513
 
        # progress). -- vila 2011-09-28
514
 
        self.overrideEnv('INSIDE_EMACS', '1')
515
 
 
516
 
    def run_script(self, script, null_output_matches_anything=False):
517
 
        return self.script_runner.run_script(self, script,
518
 
                                             null_output_matches_anything=null_output_matches_anything)
519
 
 
520
 
    def run_command(self, cmd, input, output, error):
521
 
        return self.script_runner.run_command(self, cmd, input, output, error)
522
 
 
523
 
 
524
 
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
525
 
    """Helper class to quickly define shell-like tests.
526
 
 
527
 
    Can be used as:
528
 
 
529
 
    from breezy.tests import script
530
 
 
531
 
 
532
 
    class TestBug(script.TestCaseWithTransportAndScript):
533
 
 
534
 
        def test_bug_nnnnn(self):
535
 
            self.run_script('''
536
 
            $ brz init
537
 
            $ brz do-this
538
 
            # Boom, error
539
 
            ''')
540
 
    """
541
 
 
542
 
    def setUp(self):
543
 
        super(TestCaseWithTransportAndScript, self).setUp()
544
 
        self.script_runner = ScriptRunner()
545
 
        # FIXME: See shelf_ui.Shelver._char_based. This allow using shelve in
546
 
        # scripts while providing a line-based input (better solution in
547
 
        # progress). -- vila 2011-09-28
548
 
        self.overrideEnv('INSIDE_EMACS', '1')
549
 
 
550
 
    def run_script(self, script, null_output_matches_anything=False):
551
 
        return self.script_runner.run_script(self, script,
552
 
                                             null_output_matches_anything=null_output_matches_anything)
553
 
 
554
 
    def run_command(self, cmd, input, output, error):
555
 
        return self.script_runner.run_command(self, cmd, input, output, error)
556
 
 
557
 
 
558
 
def run_script(test_case, script_string, null_output_matches_anything=False):
559
 
    """Run the given script within a testcase"""
560
 
    return ScriptRunner().run_script(test_case, script_string,
561
 
                                     null_output_matches_anything=null_output_matches_anything)