/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/script.py

  • Committer: Jelmer Vernooij
  • Date: 2019-01-01 21:08:01 UTC
  • mto: This revision was merged to the branch mainline in revision 7231.
  • Revision ID: jelmer@jelmer.uk-20190101210801-2dlsv7b1lvydmpkl
Fix tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009, 2010, 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Shell-like test scripts.
 
18
 
 
19
See developers/testing.html for more explanations.
 
20
"""
 
21
 
 
22
import doctest
 
23
import errno
 
24
import glob
 
25
import logging
 
26
import os
 
27
import shlex
 
28
import sys
 
29
import textwrap
 
30
 
 
31
from .. import (
 
32
    osutils,
 
33
    tests,
 
34
    trace,
 
35
    )
 
36
from ..tests import ui_testing
 
37
 
 
38
 
 
39
def split(s):
 
40
    """Split a command line respecting quotes."""
 
41
    scanner = shlex.shlex(s)
 
42
    scanner.quotes = '\'"`'
 
43
    scanner.whitespace_split = True
 
44
    for t in list(scanner):
 
45
        yield t
 
46
 
 
47
 
 
48
def _script_to_commands(text, file_name=None):
 
49
    """Turn a script into a list of commands with their associated IOs.
 
50
 
 
51
    Each command appears on a line by itself starting with '$ '. It can be
 
52
    associated with an input that will feed it and an expected output.
 
53
 
 
54
    Comments starts with '#' until the end of line.
 
55
    Empty lines are ignored.
 
56
 
 
57
    Input and output are full lines terminated by a '\n'.
 
58
 
 
59
    Input lines start with '<'.
 
60
    Output lines start with nothing.
 
61
    Error lines start with '2>'.
 
62
 
 
63
    :return: A sequence of ([args], input, output, errors), where the args are
 
64
        split in to words, and the input, output, and errors are just strings,
 
65
        typically containing newlines.
 
66
    """
 
67
 
 
68
    commands = []
 
69
 
 
70
    def add_command(cmd, input, output, error):
 
71
        if cmd is not None:
 
72
            if input is not None:
 
73
                input = ''.join(input)
 
74
            if output is not None:
 
75
                output = ''.join(output)
 
76
            if error is not None:
 
77
                error = ''.join(error)
 
78
            commands.append((cmd, input, output, error))
 
79
 
 
80
    cmd_cur = None
 
81
    cmd_line = 1
 
82
    lineno = 0
 
83
    input, output, error = None, None, None
 
84
    text = textwrap.dedent(text)
 
85
    lines = text.split('\n')
 
86
    # to make use of triple-quoted strings easier, we ignore a blank line
 
87
    # right at the start and right at the end; the rest are meaningful
 
88
    if lines and lines[0] == '':
 
89
        del lines[0]
 
90
    if lines and lines[-1] == '':
 
91
        del lines[-1]
 
92
    for line in lines:
 
93
        lineno += 1
 
94
        # Keep a copy for error reporting
 
95
        orig = line
 
96
        comment = line.find('#')
 
97
        if comment >= 0:
 
98
            # Delete comments
 
99
            # NB: this syntax means comments are allowed inside output, which
 
100
            # may be confusing...
 
101
            line = line[0:comment]
 
102
            line = line.rstrip()
 
103
            if line == '':
 
104
                continue
 
105
        if line.startswith('$'):
 
106
            # Time to output the current command
 
107
            add_command(cmd_cur, input, output, error)
 
108
            # And start a new one
 
109
            cmd_cur = list(split(line[1:]))
 
110
            cmd_line = lineno
 
111
            input, output, error = None, None, None
 
112
        elif line.startswith('<'):
 
113
            if input is None:
 
114
                if cmd_cur is None:
 
115
                    raise SyntaxError('No command for that input',
 
116
                                      (file_name, lineno, 1, orig))
 
117
                input = []
 
118
            input.append(line[1:] + '\n')
 
119
        elif line.startswith('2>'):
 
120
            if error is None:
 
121
                if cmd_cur is None:
 
122
                    raise SyntaxError('No command for that error',
 
123
                                      (file_name, lineno, 1, orig))
 
124
                error = []
 
125
            error.append(line[2:] + '\n')
 
126
        else:
 
127
            # can happen if the first line is not recognized as a command, eg
 
128
            # if the prompt has leading whitespace
 
129
            if output is None:
 
130
                if cmd_cur is None:
 
131
                    raise SyntaxError('No command for line %r' % (line,),
 
132
                                      (file_name, lineno, 1, orig))
 
133
                output = []
 
134
            output.append(line + '\n')
 
135
    # Add the last seen command
 
136
    add_command(cmd_cur, input, output, error)
 
137
    return commands
 
138
 
 
139
 
 
140
def _scan_redirection_options(args):
 
141
    """Recognize and process input and output redirections.
 
142
 
 
143
    :param args: The command line arguments
 
144
 
 
145
    :return: A tuple containing:
 
146
        - The file name redirected from or None
 
147
        - The file name redirected to or None
 
148
        - The mode to open the output file or None
 
149
        - The reamining arguments
 
150
    """
 
151
    def redirected_file_name(direction, name, args):
 
152
        if name == '':
 
153
            try:
 
154
                name = args.pop(0)
 
155
            except IndexError:
 
156
                # We leave the error handling to higher levels, an empty name
 
157
                # can't be legal.
 
158
                name = ''
 
159
        return name
 
160
 
 
161
    remaining = []
 
162
    in_name = None
 
163
    out_name, out_mode = None, None
 
164
    while args:
 
165
        arg = args.pop(0)
 
166
        if arg.startswith('<'):
 
167
            in_name = redirected_file_name('<', arg[1:], args)
 
168
        elif arg.startswith('>>'):
 
169
            out_name = redirected_file_name('>>', arg[2:], args)
 
170
            out_mode = 'a+'
 
171
        elif arg.startswith('>',):
 
172
            out_name = redirected_file_name('>', arg[1:], args)
 
173
            out_mode = 'w+'
 
174
        else:
 
175
            remaining.append(arg)
 
176
    return in_name, out_name, out_mode, remaining
 
177
 
 
178
 
 
179
class ScriptRunner(object):
 
180
    """Run a shell-like script from a test.
 
181
 
 
182
    Can be used as:
 
183
 
 
184
    from breezy.tests import script
 
185
 
 
186
    ...
 
187
 
 
188
        def test_bug_nnnnn(self):
 
189
            sr = script.ScriptRunner()
 
190
            sr.run_script(self, '''
 
191
            $ brz init
 
192
            $ brz do-this
 
193
            # Boom, error
 
194
            ''')
 
195
    """
 
196
 
 
197
    def __init__(self):
 
198
        self.output_checker = doctest.OutputChecker()
 
199
        self.check_options = doctest.ELLIPSIS
 
200
 
 
201
    def run_script(self, test_case, text, null_output_matches_anything=False):
 
202
        """Run a shell-like script as a test.
 
203
 
 
204
        :param test_case: A TestCase instance that should provide the fail(),
 
205
            assertEqualDiff and _run_bzr_core() methods as well as a 'test_dir'
 
206
            attribute used as a jail root.
 
207
 
 
208
        :param text: A shell-like script (see _script_to_commands for syntax).
 
209
 
 
210
        :param null_output_matches_anything: For commands with no specified
 
211
            output, ignore any output that does happen, including output on
 
212
            standard error.
 
213
        """
 
214
        self.null_output_matches_anything = null_output_matches_anything
 
215
        for cmd, input, output, error in _script_to_commands(text):
 
216
            self.run_command(test_case, cmd, input, output, error)
 
217
 
 
218
    def run_command(self, test_case, cmd, input, output, error):
 
219
        mname = 'do_' + cmd[0]
 
220
        method = getattr(self, mname, None)
 
221
        if method is None:
 
222
            raise SyntaxError('Command not found "%s"' % (cmd[0],),
 
223
                              (None, 1, 1, ' '.join(cmd)))
 
224
        if input is None:
 
225
            str_input = ''
 
226
        else:
 
227
            str_input = ''.join(input)
 
228
        args = list(self._pre_process_args(cmd[1:]))
 
229
        retcode, actual_output, actual_error = method(test_case,
 
230
                                                      str_input, args)
 
231
 
 
232
        try:
 
233
            self._check_output(output, actual_output, test_case)
 
234
        except AssertionError as e:
 
235
            raise AssertionError(str(e) + " in stdout of command %s" % cmd)
 
236
        try:
 
237
            self._check_output(error, actual_error, test_case)
 
238
        except AssertionError as e:
 
239
            raise AssertionError(str(e)
 
240
                                 + " in stderr of running command %s" % cmd)
 
241
        if retcode and not error and actual_error:
 
242
            test_case.fail('In \n\t%s\nUnexpected error: %s'
 
243
                           % (' '.join(cmd), actual_error))
 
244
        return retcode, actual_output, actual_error
 
245
 
 
246
    def _check_output(self, expected, actual, test_case):
 
247
        if not actual:
 
248
            if expected is None:
 
249
                return
 
250
            elif expected == '...\n':
 
251
                return
 
252
            else:
 
253
                test_case.fail('expected output: %r, but found nothing'
 
254
                               % (expected,))
 
255
 
 
256
        null_output_matches_anything = getattr(
 
257
            self, 'null_output_matches_anything', False)
 
258
        if null_output_matches_anything and expected is None:
 
259
            return
 
260
 
 
261
        expected = expected or ''
 
262
        matching = self.output_checker.check_output(
 
263
            expected, actual, self.check_options)
 
264
        if not matching:
 
265
            # Note that we can't use output_checker.output_difference() here
 
266
            # because... the API is broken ('expected' must be a doctest
 
267
            # specific object of which a 'want' attribute will be our
 
268
            # 'expected' parameter. So we just fallback to our good old
 
269
            # assertEqualDiff since we know there *are* differences and the
 
270
            # output should be decently readable.
 
271
            #
 
272
            # As a special case, we allow output that's missing a final
 
273
            # newline to match an expected string that does have one, so that
 
274
            # we can match a prompt printed on one line, then input given on
 
275
            # the next line.
 
276
            if expected == actual + '\n':
 
277
                pass
 
278
            else:
 
279
                test_case.assertEqualDiff(expected, actual)
 
280
 
 
281
    def _pre_process_args(self, args):
 
282
        new_args = []
 
283
        for arg in args:
 
284
            # Strip the simple and double quotes since we don't care about
 
285
            # them.  We leave the backquotes in place though since they have a
 
286
            # different semantic.
 
287
            if arg[0] in ('"', "'") and arg[0] == arg[-1]:
 
288
                yield arg[1:-1]
 
289
            else:
 
290
                if glob.has_magic(arg):
 
291
                    matches = glob.glob(arg)
 
292
                    if matches:
 
293
                        # We care more about order stability than performance
 
294
                        # here
 
295
                        matches.sort()
 
296
                        for m in matches:
 
297
                            yield m
 
298
                else:
 
299
                    yield arg
 
300
 
 
301
    def _read_input(self, input, in_name):
 
302
        if in_name is not None:
 
303
            infile = open(in_name, 'r')
 
304
            try:
 
305
                # Command redirection takes precedence over provided input
 
306
                input = infile.read()
 
307
            finally:
 
308
                infile.close()
 
309
        return input
 
310
 
 
311
    def _write_output(self, output, out_name, out_mode):
 
312
        if out_name is not None:
 
313
            outfile = open(out_name, out_mode)
 
314
            try:
 
315
                outfile.write(output)
 
316
            finally:
 
317
                outfile.close()
 
318
            output = None
 
319
        return output
 
320
 
 
321
    def do_brz(self, test_case, input, args):
 
322
        encoding = osutils.get_user_encoding()
 
323
        if sys.version_info[0] == 2:
 
324
            stdout = ui_testing.BytesIOWithEncoding()
 
325
            stderr = ui_testing.BytesIOWithEncoding()
 
326
            stdout.encoding = stderr.encoding = encoding
 
327
 
 
328
            # FIXME: don't call into logging here
 
329
            handler = trace.EncodedStreamHandler(
 
330
                stderr, errors="replace")
 
331
        else:
 
332
            stdout = ui_testing.StringIOWithEncoding()
 
333
            stderr = ui_testing.StringIOWithEncoding()
 
334
            stdout.encoding = stderr.encoding = encoding
 
335
            handler = logging.StreamHandler(stderr)
 
336
        handler.setLevel(logging.INFO)
 
337
 
 
338
        logger = logging.getLogger('')
 
339
        logger.addHandler(handler)
 
340
        try:
 
341
            retcode = test_case._run_bzr_core(
 
342
                args, encoding=encoding, stdin=input, stdout=stdout,
 
343
                stderr=stderr, working_dir=None)
 
344
        finally:
 
345
            logger.removeHandler(handler)
 
346
 
 
347
        return retcode, stdout.getvalue(), stderr.getvalue()
 
348
 
 
349
    def do_cat(self, test_case, input, args):
 
350
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
351
        if args and in_name is not None:
 
352
            raise SyntaxError('Specify a file OR use redirection')
 
353
 
 
354
        inputs = []
 
355
        if input:
 
356
            inputs.append(input)
 
357
        input_names = args
 
358
        if in_name:
 
359
            args.append(in_name)
 
360
        for in_name in input_names:
 
361
            try:
 
362
                inputs.append(self._read_input(None, in_name))
 
363
            except IOError as e:
 
364
                # Some filenames are illegal on Windows and generate EINVAL
 
365
                # rather than just saying the filename doesn't exist
 
366
                if e.errno in (errno.ENOENT, errno.EINVAL):
 
367
                    return (1, None,
 
368
                            '%s: No such file or directory\n' % (in_name,))
 
369
                raise
 
370
        # Basically cat copy input to output
 
371
        output = ''.join(inputs)
 
372
        # Handle output redirections
 
373
        try:
 
374
            output = self._write_output(output, out_name, out_mode)
 
375
        except IOError as e:
 
376
            # If out_name cannot be created, we may get 'ENOENT', however if
 
377
            # out_name is something like '', we can get EINVAL
 
378
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
379
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
380
            raise
 
381
        return 0, output, None
 
382
 
 
383
    def do_echo(self, test_case, input, args):
 
384
        (in_name, out_name, out_mode, args) = _scan_redirection_options(args)
 
385
        if input or in_name:
 
386
            raise SyntaxError('echo doesn\'t read from stdin')
 
387
        if args:
 
388
            input = ' '.join(args)
 
389
        # Always append a \n'
 
390
        input += '\n'
 
391
        # Process output
 
392
        output = input
 
393
        # Handle output redirections
 
394
        try:
 
395
            output = self._write_output(output, out_name, out_mode)
 
396
        except IOError as e:
 
397
            if e.errno in (errno.ENOENT, errno.EINVAL):
 
398
                return 1, None, '%s: No such file or directory\n' % (out_name,)
 
399
            raise
 
400
        return 0, output, None
 
401
 
 
402
    def _get_jail_root(self, test_case):
 
403
        return test_case.test_dir
 
404
 
 
405
    def _ensure_in_jail(self, test_case, path):
 
406
        jail_root = self._get_jail_root(test_case)
 
407
        if not osutils.is_inside(jail_root, osutils.normalizepath(path)):
 
408
            raise ValueError('%s is not inside %s' % (path, jail_root))
 
409
 
 
410
    def do_cd(self, test_case, input, args):
 
411
        if len(args) > 1:
 
412
            raise SyntaxError('Usage: cd [dir]')
 
413
        if len(args) == 1:
 
414
            d = args[0]
 
415
            self._ensure_in_jail(test_case, d)
 
416
        else:
 
417
            # The test "home" directory is the root of its jail
 
418
            d = self._get_jail_root(test_case)
 
419
        os.chdir(d)
 
420
        return 0, None, None
 
421
 
 
422
    def do_mkdir(self, test_case, input, args):
 
423
        if not args or len(args) != 1:
 
424
            raise SyntaxError('Usage: mkdir dir')
 
425
        d = args[0]
 
426
        self._ensure_in_jail(test_case, d)
 
427
        os.mkdir(d)
 
428
        return 0, None, None
 
429
 
 
430
    def do_rm(self, test_case, input, args):
 
431
        err = None
 
432
 
 
433
        def error(msg, path):
 
434
            return "rm: cannot remove '%s': %s\n" % (path, msg)
 
435
 
 
436
        force, recursive = False, False
 
437
        opts = None
 
438
        if args and args[0][0] == '-':
 
439
            opts = args.pop(0)[1:]
 
440
            if 'f' in opts:
 
441
                force = True
 
442
                opts = opts.replace('f', '', 1)
 
443
            if 'r' in opts:
 
444
                recursive = True
 
445
                opts = opts.replace('r', '', 1)
 
446
        if not args or opts:
 
447
            raise SyntaxError('Usage: rm [-fr] path+')
 
448
        for p in args:
 
449
            self._ensure_in_jail(test_case, p)
 
450
            # FIXME: Should we put that in osutils ?
 
451
            try:
 
452
                os.remove(p)
 
453
            except OSError as e:
 
454
                # Various OSes raises different exceptions (linux: EISDIR,
 
455
                #   win32: EACCES, OSX: EPERM) when invoked on a directory
 
456
                if e.errno in (errno.EISDIR, errno.EPERM, errno.EACCES):
 
457
                    if recursive:
 
458
                        osutils.rmtree(p)
 
459
                    else:
 
460
                        err = error('Is a directory', p)
 
461
                        break
 
462
                elif e.errno == errno.ENOENT:
 
463
                    if not force:
 
464
                        err = error('No such file or directory', p)
 
465
                        break
 
466
                else:
 
467
                    raise
 
468
        if err:
 
469
            retcode = 1
 
470
        else:
 
471
            retcode = 0
 
472
        return retcode, None, err
 
473
 
 
474
    def do_mv(self, test_case, input, args):
 
475
        err = None
 
476
 
 
477
        def error(msg, src, dst):
 
478
            return "mv: cannot move %s to %s: %s\n" % (src, dst, msg)
 
479
 
 
480
        if not args or len(args) != 2:
 
481
            raise SyntaxError("Usage: mv path1 path2")
 
482
        src, dst = args
 
483
        try:
 
484
            real_dst = dst
 
485
            if os.path.isdir(dst):
 
486
                real_dst = os.path.join(dst, os.path.basename(src))
 
487
            os.rename(src, real_dst)
 
488
        except OSError as e:
 
489
            if e.errno == errno.ENOENT:
 
490
                err = error('No such file or directory', src, dst)
 
491
            else:
 
492
                raise
 
493
        if err:
 
494
            retcode = 1
 
495
        else:
 
496
            retcode = 0
 
497
        return retcode, None, err
 
498
 
 
499
 
 
500
class TestCaseWithMemoryTransportAndScript(tests.TestCaseWithMemoryTransport):
 
501
    """Helper class to experiment shell-like test and memory fs.
 
502
 
 
503
    This not intended to be used outside of experiments in implementing memoy
 
504
    based file systems and evolving bzr so that test can use only memory based
 
505
    resources.
 
506
    """
 
507
 
 
508
    def setUp(self):
 
509
        super(TestCaseWithMemoryTransportAndScript, self).setUp()
 
510
        self.script_runner = ScriptRunner()
 
511
        # FIXME: See shelf_ui.Shelver._char_based. This allow using shelve in
 
512
        # scripts while providing a line-based input (better solution in
 
513
        # progress). -- vila 2011-09-28
 
514
        self.overrideEnv('INSIDE_EMACS', '1')
 
515
 
 
516
    def run_script(self, script, null_output_matches_anything=False):
 
517
        return self.script_runner.run_script(self, script,
 
518
                                             null_output_matches_anything=null_output_matches_anything)
 
519
 
 
520
    def run_command(self, cmd, input, output, error):
 
521
        return self.script_runner.run_command(self, cmd, input, output, error)
 
522
 
 
523
 
 
524
class TestCaseWithTransportAndScript(tests.TestCaseWithTransport):
 
525
    """Helper class to quickly define shell-like tests.
 
526
 
 
527
    Can be used as:
 
528
 
 
529
    from breezy.tests import script
 
530
 
 
531
 
 
532
    class TestBug(script.TestCaseWithTransportAndScript):
 
533
 
 
534
        def test_bug_nnnnn(self):
 
535
            self.run_script('''
 
536
            $ brz init
 
537
            $ brz do-this
 
538
            # Boom, error
 
539
            ''')
 
540
    """
 
541
 
 
542
    def setUp(self):
 
543
        super(TestCaseWithTransportAndScript, self).setUp()
 
544
        self.script_runner = ScriptRunner()
 
545
        # FIXME: See shelf_ui.Shelver._char_based. This allow using shelve in
 
546
        # scripts while providing a line-based input (better solution in
 
547
        # progress). -- vila 2011-09-28
 
548
        self.overrideEnv('INSIDE_EMACS', '1')
 
549
 
 
550
    def run_script(self, script, null_output_matches_anything=False):
 
551
        return self.script_runner.run_script(self, script,
 
552
                                             null_output_matches_anything=null_output_matches_anything)
 
553
 
 
554
    def run_command(self, cmd, input, output, error):
 
555
        return self.script_runner.run_command(self, cmd, input, output, error)
 
556
 
 
557
 
 
558
def run_script(test_case, script_string, null_output_matches_anything=False):
 
559
    """Run the given script within a testcase"""
 
560
    return ScriptRunner().run_script(test_case, script_string,
 
561
                                     null_output_matches_anything=null_output_matches_anything)