/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_cleanup.py

  • Committer: Jelmer Vernooij
  • Date: 2019-09-29 12:45:26 UTC
  • mto: This revision was merged to the branch mainline in revision 7402.
  • Revision ID: jelmer@jelmer.uk-20190929124526-f2o8u8rk883l2g8o
Remove unused _iter_children function.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
from cStringIO import StringIO
18
17
import re
19
18
 
20
 
from bzrlib.cleanup import (
21
 
    _do_with_cleanups,
22
 
    _run_cleanup,
23
 
    OperationWithCleanups,
24
 
    )
25
 
from bzrlib.tests import TestCase
26
 
from bzrlib import (
27
 
    debug,
28
 
    trace,
29
 
    )
30
 
 
31
 
 
32
 
class CleanupsTestCase(TestCase):
33
 
 
34
 
    def setUp(self):
35
 
        super(CleanupsTestCase, self).setUp()
36
 
        self.call_log = []
37
 
 
38
 
    def no_op_cleanup(self):
39
 
        self.call_log.append('no_op_cleanup')
40
 
 
41
 
    def assertLogContains(self, regex):
42
 
        self.assertContainsRe(self.get_log(), regex, re.DOTALL)
43
 
 
44
 
    def failing_cleanup(self):
45
 
        self.call_log.append('failing_cleanup')
46
 
        raise Exception("failing_cleanup goes boom!")
47
 
 
48
 
 
49
 
class TestRunCleanup(CleanupsTestCase):
50
 
 
51
 
    def test_no_errors(self):
52
 
        """The function passed to _run_cleanup is run."""
53
 
        self.assertTrue(_run_cleanup(self.no_op_cleanup))
54
 
        self.assertEqual(['no_op_cleanup'], self.call_log)
55
 
 
56
 
    def test_cleanup_with_args_kwargs(self):
57
 
        def func_taking_args_kwargs(*args, **kwargs):
58
 
            self.call_log.append(('func', args, kwargs))
59
 
        _run_cleanup(func_taking_args_kwargs, 'an arg', kwarg='foo')
60
 
        self.assertEqual(
61
 
            [('func', ('an arg',), {'kwarg': 'foo'})], self.call_log)
62
 
 
63
 
    def test_cleanup_error(self):
64
 
        """An error from the cleanup function is logged by _run_cleanup, but not
65
 
        propagated.
66
 
 
67
 
        This is there's no way for _run_cleanup to know if there's an existing
68
 
        exception in this situation::
69
 
            try:
70
 
              some_func()
71
 
            finally:
72
 
              _run_cleanup(cleanup_func)
73
 
        So, the best _run_cleanup can do is always log errors but never raise
74
 
        them.
75
 
        """
76
 
        self.assertFalse(_run_cleanup(self.failing_cleanup))
77
 
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
78
 
 
79
 
    def test_cleanup_error_debug_flag(self):
80
 
        """The -Dcleanup debug flag causes cleanup errors to be reported to the
81
 
        user.
82
 
        """
83
 
        log = StringIO()
84
 
        trace.push_log_file(log)
85
 
        debug.debug_flags.add('cleanup')
86
 
        self.assertFalse(_run_cleanup(self.failing_cleanup))
87
 
        self.assertContainsRe(
88
 
            log.getvalue(),
89
 
            "bzr: warning: Cleanup failed:.*failing_cleanup goes boom")
90
 
 
91
 
    def test_prior_error_cleanup_succeeds(self):
92
 
        """Calling _run_cleanup from a finally block will not interfere with an
93
 
        exception from the try block.
94
 
        """
95
 
        def failing_operation():
96
 
            try:
97
 
                1/0
98
 
            finally:
99
 
                _run_cleanup(self.no_op_cleanup)
100
 
        self.assertRaises(ZeroDivisionError, failing_operation)
101
 
        self.assertEqual(['no_op_cleanup'], self.call_log)
102
 
 
103
 
    def test_prior_error_cleanup_fails(self):
104
 
        """Calling _run_cleanup from a finally block will not interfere with an
105
 
        exception from the try block even when the cleanup itself raises an
106
 
        exception.
107
 
 
108
 
        The cleanup exception will be logged.
109
 
        """
110
 
        def failing_operation():
111
 
            try:
112
 
                1/0
113
 
            finally:
114
 
                _run_cleanup(self.failing_cleanup)
115
 
        self.assertRaises(ZeroDivisionError, failing_operation)
116
 
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
117
 
 
118
 
 
119
 
class TestDoWithCleanups(CleanupsTestCase):
120
 
 
121
 
    def trivial_func(self):
122
 
        self.call_log.append('trivial_func')
123
 
        return 'trivial result'
124
 
 
125
 
    def test_runs_func(self):
126
 
        """_do_with_cleanups runs the function it is given, and returns the
127
 
        result.
128
 
        """
129
 
        result = _do_with_cleanups([], self.trivial_func)
130
 
        self.assertEqual('trivial result', result)
131
 
 
132
 
    def test_runs_cleanups(self):
133
 
        """Cleanup functions are run (in the given order)."""
134
 
        cleanup_func_1 = (self.call_log.append, ('cleanup 1',), {})
135
 
        cleanup_func_2 = (self.call_log.append, ('cleanup 2',), {})
136
 
        _do_with_cleanups([cleanup_func_1, cleanup_func_2], self.trivial_func)
137
 
        self.assertEqual(
138
 
            ['trivial_func', 'cleanup 1', 'cleanup 2'], self.call_log)
139
 
 
140
 
    def failing_func(self):
141
 
        self.call_log.append('failing_func')
142
 
        1/0
143
 
 
144
 
    def test_func_error_propagates(self):
145
 
        """Errors from the main function are propagated (after running
146
 
        cleanups).
147
 
        """
148
 
        self.assertRaises(
149
 
            ZeroDivisionError, _do_with_cleanups,
150
 
            [(self.no_op_cleanup, (), {})], self.failing_func)
151
 
        self.assertEqual(['failing_func', 'no_op_cleanup'], self.call_log)
152
 
 
153
 
    def test_func_error_trumps_cleanup_error(self):
154
 
        """Errors from the main function a propagated even if a cleanup raises
155
 
        an error.
156
 
 
157
 
        The cleanup error is be logged.
158
 
        """
159
 
        self.assertRaises(
160
 
            ZeroDivisionError, _do_with_cleanups,
161
 
            [(self.failing_cleanup, (), {})], self.failing_func)
162
 
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
163
 
 
164
 
    def test_func_passes_and_error_from_cleanup(self):
165
 
        """An error from a cleanup is propagated when the main function doesn't
166
 
        raise an error.  Later cleanups are still executed.
167
 
        """
168
 
        exc = self.assertRaises(
169
 
            Exception, _do_with_cleanups,
170
 
            [(self.failing_cleanup, (), {}), (self.no_op_cleanup, (), {})],
171
 
            self.trivial_func)
172
 
        self.assertEqual('failing_cleanup goes boom!', exc.args[0])
173
 
        self.assertEqual(
174
 
            ['trivial_func', 'failing_cleanup', 'no_op_cleanup'],
175
 
            self.call_log)
176
 
 
177
 
    def test_multiple_cleanup_failures(self):
178
 
        """When multiple cleanups fail (as tends to happen when something has
179
 
        gone wrong), the first error is propagated, and subsequent errors are
180
 
        logged.
181
 
        """
182
 
        cleanups = self.make_two_failing_cleanup_funcs()
183
 
        self.assertRaises(ErrorA, _do_with_cleanups, cleanups,
184
 
            self.trivial_func)
185
 
        self.assertLogContains('Cleanup failed:.*ErrorB')
186
 
        self.assertFalse('ErrorA' in self.get_log())
187
 
 
188
 
    def make_two_failing_cleanup_funcs(self):
189
 
        def raise_a():
190
 
            raise ErrorA('Error A')
191
 
        def raise_b():
192
 
            raise ErrorB('Error B')
193
 
        return [(raise_a, (), {}), (raise_b, (), {})]
194
 
 
195
 
    def test_multiple_cleanup_failures_debug_flag(self):
196
 
        log = StringIO()
197
 
        trace.push_log_file(log)
198
 
        debug.debug_flags.add('cleanup')
199
 
        cleanups = self.make_two_failing_cleanup_funcs()
200
 
        self.assertRaises(ErrorA, _do_with_cleanups, cleanups,
201
 
            self.trivial_func)
202
 
        self.assertContainsRe(
203
 
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error B\n")
204
 
        self.assertEqual(1, log.getvalue().count('bzr: warning:'),
205
 
                log.getvalue())
206
 
 
207
 
    def test_func_and_cleanup_errors_debug_flag(self):
208
 
        log = StringIO()
209
 
        trace.push_log_file(log)
210
 
        debug.debug_flags.add('cleanup')
211
 
        cleanups = self.make_two_failing_cleanup_funcs()
212
 
        self.assertRaises(ZeroDivisionError, _do_with_cleanups, cleanups,
213
 
            self.failing_func)
214
 
        self.assertContainsRe(
215
 
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error A\n")
216
 
        self.assertContainsRe(
217
 
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error B\n")
218
 
        self.assertEqual(2, log.getvalue().count('bzr: warning:'))
219
 
 
220
 
    def test_func_may_mutate_cleanups(self):
221
 
        """The main func may mutate the cleanups before it returns.
222
 
        
223
 
        This allows a function to gradually add cleanups as it acquires
224
 
        resources, rather than planning all the cleanups up-front.  The
225
 
        OperationWithCleanups helper relies on this working.
226
 
        """
227
 
        cleanups_list = []
228
 
        def func_that_adds_cleanups():
229
 
            self.call_log.append('func_that_adds_cleanups')
230
 
            cleanups_list.append((self.no_op_cleanup, (), {}))
231
 
            return 'result'
232
 
        result = _do_with_cleanups(cleanups_list, func_that_adds_cleanups)
233
 
        self.assertEqual('result', result)
234
 
        self.assertEqual(
235
 
            ['func_that_adds_cleanups', 'no_op_cleanup'], self.call_log)
236
 
 
237
 
    def test_cleanup_error_debug_flag(self):
238
 
        """The -Dcleanup debug flag causes cleanup errors to be reported to the
239
 
        user.
240
 
        """
241
 
        log = StringIO()
242
 
        trace.push_log_file(log)
243
 
        debug.debug_flags.add('cleanup')
244
 
        self.assertRaises(ZeroDivisionError, _do_with_cleanups,
245
 
            [(self.failing_cleanup, (), {})], self.failing_func)
246
 
        self.assertContainsRe(
247
 
            log.getvalue(),
248
 
            "bzr: warning: Cleanup failed:.*failing_cleanup goes boom")
249
 
        self.assertEqual(1, log.getvalue().count('bzr: warning:'))
250
 
 
251
 
 
252
 
class ErrorA(Exception): pass
253
 
class ErrorB(Exception): pass
254
 
 
255
 
 
256
 
class TestOperationWithCleanups(CleanupsTestCase):
257
 
 
258
 
    def test_cleanup_ordering(self):
259
 
        """Cleanups are added in LIFO order.
260
 
 
261
 
        So cleanups added before run is called are run last, and the last
262
 
        cleanup added during the func is run first.
263
 
        """
264
 
        call_log = []
265
 
        def func(op, foo):
266
 
            call_log.append(('func called', foo))
267
 
            op.add_cleanup(call_log.append, 'cleanup 2')
268
 
            op.add_cleanup(call_log.append, 'cleanup 1')
269
 
            return 'result'
270
 
        owc = OperationWithCleanups(func)
271
 
        owc.add_cleanup(call_log.append, 'cleanup 4')
272
 
        owc.add_cleanup(call_log.append, 'cleanup 3')
273
 
        result = owc.run('foo')
274
 
        self.assertEqual('result', result)
275
 
        self.assertEqual(
276
 
            [('func called', 'foo'), 'cleanup 1', 'cleanup 2', 'cleanup 3',
277
 
            'cleanup 4'], call_log)
278
 
 
 
19
from ..cleanup import (
 
20
    ExitStack,
 
21
    )
 
22
from ..sixish import PY3
 
23
from .. import (
 
24
    tests,
 
25
    )
 
26
 
 
27
from contextlib import contextmanager
 
28
 
 
29
 
 
30
check_exception_chaining = PY3
 
31
 
 
32
 
 
33
# Imported from contextlib2's test_contextlib2.py
 
34
# Copyright: Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008,
 
35
#   2009, 2010, 2011 Python Software Foundation
 
36
#
 
37
# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
 
38
# --------------------------------------------
 
39
# .
 
40
# 1. This LICENSE AGREEMENT is between the Python Software Foundation
 
41
# ("PSF"), and the Individual or Organization ("Licensee") accessing and
 
42
# otherwise using this software ("Python") in source or binary form and
 
43
# its associated documentation.
 
44
# .
 
45
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby
 
46
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
 
47
# analyze, test, perform and/or display publicly, prepare derivative works,
 
48
# distribute, and otherwise use Python alone or in any derivative version,
 
49
# provided, however, that PSF's License Agreement and PSF's notice of copyright,
 
50
# i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
 
51
# 2011 Python Software Foundation; All Rights Reserved" are retained in Python
 
52
# alone or in any derivative version prepared by Licensee.
 
53
# .
 
54
# 3. In the event Licensee prepares a derivative work that is based on
 
55
# or incorporates Python or any part thereof, and wants to make
 
56
# the derivative work available to others as provided herein, then
 
57
# Licensee hereby agrees to include in any such work a brief summary of
 
58
# the changes made to Python.
 
59
# .
 
60
# 4. PSF is making Python available to Licensee on an "AS IS"
 
61
# basis.  PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
 
62
# IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
 
63
# DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
 
64
# FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
 
65
# INFRINGE ANY THIRD PARTY RIGHTS.
 
66
# .
 
67
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
 
68
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
 
69
# A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
 
70
# OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
 
71
# .
 
72
# 6. This License Agreement will automatically terminate upon a material
 
73
# breach of its terms and conditions.
 
74
# .
 
75
# 7. Nothing in this License Agreement shall be deemed to create any
 
76
# relationship of agency, partnership, or joint venture between PSF and
 
77
# Licensee.  This License Agreement does not grant permission to use PSF
 
78
# trademarks or trade name in a trademark sense to endorse or promote
 
79
# products or services of Licensee, or any third party.
 
80
# .
 
81
# 8. By copying, installing or otherwise using Python, Licensee
 
82
# agrees to be bound by the terms and conditions of this License
 
83
# Agreement.
 
84
 
 
85
 
 
86
class TestExitStack(tests.TestCase):
 
87
 
 
88
    def test_no_resources(self):
 
89
        with ExitStack():
 
90
            pass
 
91
 
 
92
    def test_callback(self):
 
93
        expected = [
 
94
            ((), {}),
 
95
            ((1,), {}),
 
96
            ((1, 2), {}),
 
97
            ((), dict(example=1)),
 
98
            ((1,), dict(example=1)),
 
99
            ((1, 2), dict(example=1)),
 
100
        ]
 
101
        result = []
 
102
        def _exit(*args, **kwds):
 
103
            """Test metadata propagation"""
 
104
            result.append((args, kwds))
 
105
        with ExitStack() as stack:
 
106
            for args, kwds in reversed(expected):
 
107
                if args and kwds:
 
108
                    f = stack.callback(_exit, *args, **kwds)
 
109
                elif args:
 
110
                    f = stack.callback(_exit, *args)
 
111
                elif kwds:
 
112
                    f = stack.callback(_exit, **kwds)
 
113
                else:
 
114
                    f = stack.callback(_exit)
 
115
                self.assertIs(f, _exit)
 
116
        self.assertEqual(result, expected)
 
117
 
 
118
    def test_push(self):
 
119
        exc_raised = ZeroDivisionError
 
120
        def _expect_exc(exc_type, exc, exc_tb):
 
121
            self.assertIs(exc_type, exc_raised)
 
122
        def _suppress_exc(*exc_details):
 
123
            return True
 
124
        def _expect_ok(exc_type, exc, exc_tb):
 
125
            self.assertIsNone(exc_type)
 
126
            self.assertIsNone(exc)
 
127
            self.assertIsNone(exc_tb)
 
128
        class ExitCM(object):
 
129
            def __init__(self, check_exc):
 
130
                self.check_exc = check_exc
 
131
            def __enter__(self):
 
132
                self.fail("Should not be called!")
 
133
            def __exit__(self, *exc_details):
 
134
                self.check_exc(*exc_details)
 
135
        with ExitStack() as stack:
 
136
            stack.push(_expect_ok)
 
137
            cm = ExitCM(_expect_ok)
 
138
            stack.push(cm)
 
139
            stack.push(_suppress_exc)
 
140
            cm = ExitCM(_expect_exc)
 
141
            stack.push(cm)
 
142
            stack.push(_expect_exc)
 
143
            stack.push(_expect_exc)
 
144
            1 / 0
 
145
 
 
146
    def test_enter_context(self):
 
147
        class TestCM(object):
 
148
            def __enter__(self):
 
149
                result.append(1)
 
150
            def __exit__(self, *exc_details):
 
151
                result.append(3)
 
152
 
 
153
        result = []
 
154
        cm = TestCM()
 
155
        with ExitStack() as stack:
 
156
            @stack.callback  # Registered first => cleaned up last
 
157
            def _exit():
 
158
                result.append(4)
 
159
            self.assertIsNotNone(_exit)
 
160
            stack.enter_context(cm)
 
161
            result.append(2)
 
162
        self.assertEqual(result, [1, 2, 3, 4])
 
163
 
 
164
    def test_close(self):
 
165
        result = []
 
166
        with ExitStack() as stack:
 
167
            @stack.callback
 
168
            def _exit():
 
169
                result.append(1)
 
170
            self.assertIsNotNone(_exit)
 
171
            stack.close()
 
172
            result.append(2)
 
173
        self.assertEqual(result, [1, 2])
 
174
 
 
175
    def test_pop_all(self):
 
176
        result = []
 
177
        with ExitStack() as stack:
 
178
            @stack.callback
 
179
            def _exit():
 
180
                result.append(3)
 
181
            self.assertIsNotNone(_exit)
 
182
            new_stack = stack.pop_all()
 
183
            result.append(1)
 
184
        result.append(2)
 
185
        new_stack.close()
 
186
        self.assertEqual(result, [1, 2, 3])
 
187
 
 
188
    def test_exit_raise(self):
 
189
        def _raise():
 
190
            with ExitStack() as stack:
 
191
                stack.push(lambda *exc: False)
 
192
                1 / 0
 
193
        self.assertRaises(ZeroDivisionError, _raise)
 
194
 
 
195
    def test_exit_suppress(self):
 
196
        with ExitStack() as stack:
 
197
            stack.push(lambda *exc: True)
 
198
            1 / 0
 
199
 
 
200
    def test_exit_exception_chaining_reference(self):
 
201
        # Sanity check to make sure that ExitStack chaining matches
 
202
        # actual nested with statements
 
203
        class RaiseExc:
 
204
            def __init__(self, exc):
 
205
                self.exc = exc
 
206
            def __enter__(self):
 
207
                return self
 
208
            def __exit__(self, *exc_details):
 
209
                raise self.exc
 
210
 
 
211
        class RaiseExcWithContext:
 
212
            def __init__(self, outer, inner):
 
213
                self.outer = outer
 
214
                self.inner = inner
 
215
            def __enter__(self):
 
216
                return self
 
217
            def __exit__(self, *exc_details):
 
218
                try:
 
219
                    raise self.inner
 
220
                except:
 
221
                    raise self.outer
 
222
 
 
223
        class SuppressExc:
 
224
            def __enter__(self):
 
225
                return self
 
226
            def __exit__(self, *exc_details):
 
227
                self.__class__.saved_details = exc_details
 
228
                return True
 
229
 
 
230
        try:
 
231
            with RaiseExc(IndexError):
 
232
                with RaiseExcWithContext(KeyError, AttributeError):
 
233
                    with SuppressExc():
 
234
                        with RaiseExc(ValueError):
 
235
                            1 / 0
 
236
        except IndexError as exc:
 
237
            if check_exception_chaining:
 
238
                self.assertIsInstance(exc.__context__, KeyError)
 
239
                self.assertIsInstance(exc.__context__.__context__, AttributeError)
 
240
                # Inner exceptions were suppressed
 
241
                self.assertIsNone(exc.__context__.__context__.__context__)
 
242
        else:
 
243
            self.fail("Expected IndexError, but no exception was raised")
 
244
        # Check the inner exceptions
 
245
        inner_exc = SuppressExc.saved_details[1]
 
246
        self.assertIsInstance(inner_exc, ValueError)
 
247
        if check_exception_chaining:
 
248
            self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
 
249
 
 
250
    def test_exit_exception_chaining(self):
 
251
        # Ensure exception chaining matches the reference behaviour
 
252
        def raise_exc(exc):
 
253
            raise exc
 
254
 
 
255
        saved_details = [None]
 
256
        def suppress_exc(*exc_details):
 
257
            saved_details[0] = exc_details
 
258
            return True
 
259
 
 
260
        try:
 
261
            with ExitStack() as stack:
 
262
                stack.callback(raise_exc, IndexError)
 
263
                stack.callback(raise_exc, KeyError)
 
264
                stack.callback(raise_exc, AttributeError)
 
265
                stack.push(suppress_exc)
 
266
                stack.callback(raise_exc, ValueError)
 
267
                1 / 0
 
268
        except IndexError as exc:
 
269
            if check_exception_chaining:
 
270
                self.assertIsInstance(exc.__context__, KeyError)
 
271
                self.assertIsInstance(exc.__context__.__context__, AttributeError)
 
272
                # Inner exceptions were suppressed
 
273
                self.assertIsNone(exc.__context__.__context__.__context__)
 
274
        else:
 
275
            self.fail("Expected IndexError, but no exception was raised")
 
276
        # Check the inner exceptions
 
277
        inner_exc = saved_details[0][1]
 
278
        self.assertIsInstance(inner_exc, ValueError)
 
279
        if check_exception_chaining:
 
280
            self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
 
281
 
 
282
    def test_exit_exception_non_suppressing(self):
 
283
        # http://bugs.python.org/issue19092
 
284
        def raise_exc(exc):
 
285
            raise exc
 
286
 
 
287
        def suppress_exc(*exc_details):
 
288
            return True
 
289
 
 
290
        try:
 
291
            with ExitStack() as stack:
 
292
                stack.callback(lambda: None)
 
293
                stack.callback(raise_exc, IndexError)
 
294
        except Exception as exc:
 
295
            self.assertIsInstance(exc, IndexError)
 
296
        else:
 
297
            self.fail("Expected IndexError, but no exception was raised")
 
298
 
 
299
        try:
 
300
            with ExitStack() as stack:
 
301
                stack.callback(raise_exc, KeyError)
 
302
                stack.push(suppress_exc)
 
303
                stack.callback(raise_exc, IndexError)
 
304
        except Exception as exc:
 
305
            self.assertIsInstance(exc, KeyError)
 
306
        else:
 
307
            self.fail("Expected KeyError, but no exception was raised")
 
308
 
 
309
    def test_exit_exception_with_correct_context(self):
 
310
        # http://bugs.python.org/issue20317
 
311
        @contextmanager
 
312
        def gets_the_context_right(exc):
 
313
            try:
 
314
                yield
 
315
            finally:
 
316
                raise exc
 
317
 
 
318
        exc1 = Exception(1)
 
319
        exc2 = Exception(2)
 
320
        exc3 = Exception(3)
 
321
        exc4 = Exception(4)
 
322
 
 
323
        # The contextmanager already fixes the context, so prior to the
 
324
        # fix, ExitStack would try to fix it *again* and get into an
 
325
        # infinite self-referential loop
 
326
        try:
 
327
            with ExitStack() as stack:
 
328
                stack.enter_context(gets_the_context_right(exc4))
 
329
                stack.enter_context(gets_the_context_right(exc3))
 
330
                stack.enter_context(gets_the_context_right(exc2))
 
331
                raise exc1
 
332
        except Exception as exc:
 
333
            self.assertIs(exc, exc4)
 
334
            if check_exception_chaining:
 
335
                self.assertIs(exc.__context__, exc3)
 
336
                self.assertIs(exc.__context__.__context__, exc2)
 
337
                self.assertIs(exc.__context__.__context__.__context__, exc1)
 
338
                self.assertIsNone(
 
339
                    exc.__context__.__context__.__context__.__context__)
 
340
 
 
341
    def test_exit_exception_with_existing_context(self):
 
342
        # Addresses a lack of test coverage discovered after checking in a
 
343
        # fix for issue 20317 that still contained debugging code.
 
344
        def raise_nested(inner_exc, outer_exc):
 
345
            try:
 
346
                raise inner_exc
 
347
            finally:
 
348
                raise outer_exc
 
349
        exc1 = Exception(1)
 
350
        exc2 = Exception(2)
 
351
        exc3 = Exception(3)
 
352
        exc4 = Exception(4)
 
353
        exc5 = Exception(5)
 
354
        try:
 
355
            with ExitStack() as stack:
 
356
                stack.callback(raise_nested, exc4, exc5)
 
357
                stack.callback(raise_nested, exc2, exc3)
 
358
                raise exc1
 
359
        except Exception as exc:
 
360
            self.assertIs(exc, exc5)
 
361
            if check_exception_chaining:
 
362
                self.assertIs(exc.__context__, exc4)
 
363
                self.assertIs(exc.__context__.__context__, exc3)
 
364
                self.assertIs(exc.__context__.__context__.__context__, exc2)
 
365
                self.assertIs(
 
366
                    exc.__context__.__context__.__context__.__context__, exc1)
 
367
                self.assertIsNone(
 
368
                    exc.__context__.__context__.__context__.__context__.__context__)
 
369
 
 
370
    def test_body_exception_suppress(self):
 
371
        def suppress_exc(*exc_details):
 
372
            return True
 
373
        try:
 
374
            with ExitStack() as stack:
 
375
                stack.push(suppress_exc)
 
376
                1 / 0
 
377
        except IndexError as exc:
 
378
            self.fail("Expected no exception, got IndexError")
 
379
 
 
380
    def test_exit_exception_chaining_suppress(self):
 
381
        with ExitStack() as stack:
 
382
            stack.push(lambda *exc: True)
 
383
            stack.push(lambda *exc: 1 / 0)
 
384
            stack.push(lambda *exc: {}[1])
 
385
 
 
386
    def test_excessive_nesting(self):
 
387
        # The original implementation would die with RecursionError here
 
388
        with ExitStack() as stack:
 
389
            for i in range(10000):
 
390
                stack.callback(int)
 
391
 
 
392
    def test_instance_bypass(self):
 
393
        class Example(object):
 
394
            pass
 
395
        cm = Example()
 
396
        cm.__exit__ = object()
 
397
        stack = ExitStack()
 
398
        self.assertRaises(AttributeError, stack.enter_context, cm)
 
399
        stack.push(cm)
 
400
        # self.assertIs(stack._exit_callbacks[-1], cm)