14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
from cStringIO import StringIO
19
from ..cleanup import (
22
from ..sixish import PY3
27
from contextlib import contextmanager
30
check_exception_chaining = PY3
33
# Imported from contextlib2's test_contextlib2.py
34
# Copyright: Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008,
35
# 2009, 2010, 2011 Python Software Foundation
37
# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
38
# --------------------------------------------
40
# 1. This LICENSE AGREEMENT is between the Python Software Foundation
41
# ("PSF"), and the Individual or Organization ("Licensee") accessing and
42
# otherwise using this software ("Python") in source or binary form and
43
# its associated documentation.
45
# 2. Subject to the terms and conditions of this License Agreement, PSF hereby
46
# grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
47
# analyze, test, perform and/or display publicly, prepare derivative works,
48
# distribute, and otherwise use Python alone or in any derivative version,
49
# provided, however, that PSF's License Agreement and PSF's notice of copyright,
50
# i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
51
# 2011 Python Software Foundation; All Rights Reserved" are retained in Python
52
# alone or in any derivative version prepared by Licensee.
54
# 3. In the event Licensee prepares a derivative work that is based on
55
# or incorporates Python or any part thereof, and wants to make
56
# the derivative work available to others as provided herein, then
57
# Licensee hereby agrees to include in any such work a brief summary of
58
# the changes made to Python.
60
# 4. PSF is making Python available to Licensee on an "AS IS"
61
# basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
62
# IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
63
# DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
64
# FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
65
# INFRINGE ANY THIRD PARTY RIGHTS.
67
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
68
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
69
# A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
70
# OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
72
# 6. This License Agreement will automatically terminate upon a material
73
# breach of its terms and conditions.
75
# 7. Nothing in this License Agreement shall be deemed to create any
76
# relationship of agency, partnership, or joint venture between PSF and
77
# Licensee. This License Agreement does not grant permission to use PSF
78
# trademarks or trade name in a trademark sense to endorse or promote
79
# products or services of Licensee, or any third party.
81
# 8. By copying, installing or otherwise using Python, Licensee
82
# agrees to be bound by the terms and conditions of this License
86
class TestExitStack(tests.TestCase):
88
def test_no_resources(self):
92
def test_callback(self):
97
((), dict(example=1)),
98
((1,), dict(example=1)),
99
((1, 2), dict(example=1)),
102
def _exit(*args, **kwds):
103
"""Test metadata propagation"""
104
result.append((args, kwds))
105
with ExitStack() as stack:
106
for args, kwds in reversed(expected):
108
f = stack.callback(_exit, *args, **kwds)
110
f = stack.callback(_exit, *args)
112
f = stack.callback(_exit, **kwds)
114
f = stack.callback(_exit)
115
self.assertIs(f, _exit)
116
self.assertEqual(result, expected)
119
exc_raised = ZeroDivisionError
120
def _expect_exc(exc_type, exc, exc_tb):
121
self.assertIs(exc_type, exc_raised)
122
def _suppress_exc(*exc_details):
124
def _expect_ok(exc_type, exc, exc_tb):
125
self.assertIsNone(exc_type)
126
self.assertIsNone(exc)
127
self.assertIsNone(exc_tb)
128
class ExitCM(object):
129
def __init__(self, check_exc):
130
self.check_exc = check_exc
132
self.fail("Should not be called!")
133
def __exit__(self, *exc_details):
134
self.check_exc(*exc_details)
135
with ExitStack() as stack:
136
stack.push(_expect_ok)
137
cm = ExitCM(_expect_ok)
139
stack.push(_suppress_exc)
140
cm = ExitCM(_expect_exc)
142
stack.push(_expect_exc)
143
stack.push(_expect_exc)
146
def test_enter_context(self):
147
class TestCM(object):
150
def __exit__(self, *exc_details):
155
with ExitStack() as stack:
156
@stack.callback # Registered first => cleaned up last
159
self.assertIsNotNone(_exit)
160
stack.enter_context(cm)
162
self.assertEqual(result, [1, 2, 3, 4])
164
def test_close(self):
166
with ExitStack() as stack:
170
self.assertIsNotNone(_exit)
173
self.assertEqual(result, [1, 2])
175
def test_pop_all(self):
177
with ExitStack() as stack:
181
self.assertIsNotNone(_exit)
182
new_stack = stack.pop_all()
186
self.assertEqual(result, [1, 2, 3])
188
def test_exit_raise(self):
190
with ExitStack() as stack:
191
stack.push(lambda *exc: False)
193
self.assertRaises(ZeroDivisionError, _raise)
195
def test_exit_suppress(self):
196
with ExitStack() as stack:
197
stack.push(lambda *exc: True)
200
def test_exit_exception_chaining_reference(self):
201
# Sanity check to make sure that ExitStack chaining matches
202
# actual nested with statements
204
def __init__(self, exc):
208
def __exit__(self, *exc_details):
211
class RaiseExcWithContext:
212
def __init__(self, outer, inner):
217
def __exit__(self, *exc_details):
226
def __exit__(self, *exc_details):
227
self.__class__.saved_details = exc_details
231
with RaiseExc(IndexError):
232
with RaiseExcWithContext(KeyError, AttributeError):
234
with RaiseExc(ValueError):
236
except IndexError as exc:
237
if check_exception_chaining:
238
self.assertIsInstance(exc.__context__, KeyError)
239
self.assertIsInstance(exc.__context__.__context__, AttributeError)
240
# Inner exceptions were suppressed
241
self.assertIsNone(exc.__context__.__context__.__context__)
243
self.fail("Expected IndexError, but no exception was raised")
244
# Check the inner exceptions
245
inner_exc = SuppressExc.saved_details[1]
246
self.assertIsInstance(inner_exc, ValueError)
247
if check_exception_chaining:
248
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
250
def test_exit_exception_chaining(self):
251
# Ensure exception chaining matches the reference behaviour
255
saved_details = [None]
256
def suppress_exc(*exc_details):
257
saved_details[0] = exc_details
261
with ExitStack() as stack:
262
stack.callback(raise_exc, IndexError)
263
stack.callback(raise_exc, KeyError)
264
stack.callback(raise_exc, AttributeError)
265
stack.push(suppress_exc)
266
stack.callback(raise_exc, ValueError)
268
except IndexError as exc:
269
if check_exception_chaining:
270
self.assertIsInstance(exc.__context__, KeyError)
271
self.assertIsInstance(exc.__context__.__context__, AttributeError)
272
# Inner exceptions were suppressed
273
self.assertIsNone(exc.__context__.__context__.__context__)
275
self.fail("Expected IndexError, but no exception was raised")
276
# Check the inner exceptions
277
inner_exc = saved_details[0][1]
278
self.assertIsInstance(inner_exc, ValueError)
279
if check_exception_chaining:
280
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
282
def test_exit_exception_non_suppressing(self):
283
# http://bugs.python.org/issue19092
287
def suppress_exc(*exc_details):
291
with ExitStack() as stack:
292
stack.callback(lambda: None)
293
stack.callback(raise_exc, IndexError)
294
except Exception as exc:
295
self.assertIsInstance(exc, IndexError)
297
self.fail("Expected IndexError, but no exception was raised")
300
with ExitStack() as stack:
301
stack.callback(raise_exc, KeyError)
302
stack.push(suppress_exc)
303
stack.callback(raise_exc, IndexError)
304
except Exception as exc:
305
self.assertIsInstance(exc, KeyError)
307
self.fail("Expected KeyError, but no exception was raised")
309
def test_exit_exception_with_correct_context(self):
310
# http://bugs.python.org/issue20317
312
def gets_the_context_right(exc):
323
# The contextmanager already fixes the context, so prior to the
324
# fix, ExitStack would try to fix it *again* and get into an
325
# infinite self-referential loop
327
with ExitStack() as stack:
328
stack.enter_context(gets_the_context_right(exc4))
329
stack.enter_context(gets_the_context_right(exc3))
330
stack.enter_context(gets_the_context_right(exc2))
332
except Exception as exc:
333
self.assertIs(exc, exc4)
334
if check_exception_chaining:
335
self.assertIs(exc.__context__, exc3)
336
self.assertIs(exc.__context__.__context__, exc2)
337
self.assertIs(exc.__context__.__context__.__context__, exc1)
339
exc.__context__.__context__.__context__.__context__)
341
def test_exit_exception_with_existing_context(self):
342
# Addresses a lack of test coverage discovered after checking in a
343
# fix for issue 20317 that still contained debugging code.
344
def raise_nested(inner_exc, outer_exc):
355
with ExitStack() as stack:
356
stack.callback(raise_nested, exc4, exc5)
357
stack.callback(raise_nested, exc2, exc3)
359
except Exception as exc:
360
self.assertIs(exc, exc5)
361
if check_exception_chaining:
362
self.assertIs(exc.__context__, exc4)
363
self.assertIs(exc.__context__.__context__, exc3)
364
self.assertIs(exc.__context__.__context__.__context__, exc2)
366
exc.__context__.__context__.__context__.__context__, exc1)
368
exc.__context__.__context__.__context__.__context__.__context__)
370
def test_body_exception_suppress(self):
371
def suppress_exc(*exc_details):
374
with ExitStack() as stack:
375
stack.push(suppress_exc)
377
except IndexError as exc:
378
self.fail("Expected no exception, got IndexError")
380
def test_exit_exception_chaining_suppress(self):
381
with ExitStack() as stack:
382
stack.push(lambda *exc: True)
383
stack.push(lambda *exc: 1 / 0)
384
stack.push(lambda *exc: {}[1])
386
def test_excessive_nesting(self):
387
# The original implementation would die with RecursionError here
388
with ExitStack() as stack:
389
for i in range(10000):
392
def test_instance_bypass(self):
393
class Example(object):
396
cm.__exit__ = object()
398
self.assertRaises(AttributeError, stack.enter_context, cm)
400
# self.assertIs(stack._exit_callbacks[-1], cm)
20
from bzrlib.cleanup import (
23
OperationWithCleanups,
25
from bzrlib.tests import TestCase
32
class CleanupsTestCase(TestCase):
35
super(CleanupsTestCase, self).setUp()
38
def no_op_cleanup(self):
39
self.call_log.append('no_op_cleanup')
41
def assertLogContains(self, regex):
42
self.assertContainsRe(self.get_log(), regex, re.DOTALL)
44
def failing_cleanup(self):
45
self.call_log.append('failing_cleanup')
46
raise Exception("failing_cleanup goes boom!")
49
class TestRunCleanup(CleanupsTestCase):
51
def test_no_errors(self):
52
"""The function passed to _run_cleanup is run."""
53
self.assertTrue(_run_cleanup(self.no_op_cleanup))
54
self.assertEqual(['no_op_cleanup'], self.call_log)
56
def test_cleanup_with_args_kwargs(self):
57
def func_taking_args_kwargs(*args, **kwargs):
58
self.call_log.append(('func', args, kwargs))
59
_run_cleanup(func_taking_args_kwargs, 'an arg', kwarg='foo')
61
[('func', ('an arg',), {'kwarg': 'foo'})], self.call_log)
63
def test_cleanup_error(self):
64
"""An error from the cleanup function is logged by _run_cleanup, but not
67
This is there's no way for _run_cleanup to know if there's an existing
68
exception in this situation::
72
_run_cleanup(cleanup_func)
73
So, the best _run_cleanup can do is always log errors but never raise
76
self.assertFalse(_run_cleanup(self.failing_cleanup))
77
self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
79
def test_cleanup_error_debug_flag(self):
80
"""The -Dcleanup debug flag causes cleanup errors to be reported to the
84
trace.push_log_file(log)
85
debug.debug_flags.add('cleanup')
86
self.assertFalse(_run_cleanup(self.failing_cleanup))
87
self.assertContainsRe(
89
"bzr: warning: Cleanup failed:.*failing_cleanup goes boom")
91
def test_prior_error_cleanup_succeeds(self):
92
"""Calling _run_cleanup from a finally block will not interfere with an
93
exception from the try block.
95
def failing_operation():
99
_run_cleanup(self.no_op_cleanup)
100
self.assertRaises(ZeroDivisionError, failing_operation)
101
self.assertEqual(['no_op_cleanup'], self.call_log)
103
def test_prior_error_cleanup_fails(self):
104
"""Calling _run_cleanup from a finally block will not interfere with an
105
exception from the try block even when the cleanup itself raises an
108
The cleanup exception will be logged.
110
def failing_operation():
114
_run_cleanup(self.failing_cleanup)
115
self.assertRaises(ZeroDivisionError, failing_operation)
116
self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
119
class TestDoWithCleanups(CleanupsTestCase):
121
def trivial_func(self):
122
self.call_log.append('trivial_func')
123
return 'trivial result'
125
def test_runs_func(self):
126
"""_do_with_cleanups runs the function it is given, and returns the
129
result = _do_with_cleanups([], self.trivial_func)
130
self.assertEqual('trivial result', result)
132
def test_runs_cleanups(self):
133
"""Cleanup functions are run (in the given order)."""
134
cleanup_func_1 = (self.call_log.append, ('cleanup 1',), {})
135
cleanup_func_2 = (self.call_log.append, ('cleanup 2',), {})
136
_do_with_cleanups([cleanup_func_1, cleanup_func_2], self.trivial_func)
138
['trivial_func', 'cleanup 1', 'cleanup 2'], self.call_log)
140
def failing_func(self):
141
self.call_log.append('failing_func')
144
def test_func_error_propagates(self):
145
"""Errors from the main function are propagated (after running
149
ZeroDivisionError, _do_with_cleanups,
150
[(self.no_op_cleanup, (), {})], self.failing_func)
151
self.assertEqual(['failing_func', 'no_op_cleanup'], self.call_log)
153
def test_func_error_trumps_cleanup_error(self):
154
"""Errors from the main function a propagated even if a cleanup raises
157
The cleanup error is be logged.
160
ZeroDivisionError, _do_with_cleanups,
161
[(self.failing_cleanup, (), {})], self.failing_func)
162
self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
164
def test_func_passes_and_error_from_cleanup(self):
165
"""An error from a cleanup is propagated when the main function doesn't
166
raise an error. Later cleanups are still executed.
168
exc = self.assertRaises(
169
Exception, _do_with_cleanups,
170
[(self.failing_cleanup, (), {}), (self.no_op_cleanup, (), {})],
172
self.assertEqual('failing_cleanup goes boom!', exc.args[0])
174
['trivial_func', 'failing_cleanup', 'no_op_cleanup'],
177
def test_multiple_cleanup_failures(self):
178
"""When multiple cleanups fail (as tends to happen when something has
179
gone wrong), the first error is propagated, and subsequent errors are
182
cleanups = self.make_two_failing_cleanup_funcs()
183
self.assertRaises(ErrorA, _do_with_cleanups, cleanups,
185
self.assertLogContains('Cleanup failed:.*ErrorB')
186
self.assertFalse('ErrorA' in self.get_log())
188
def make_two_failing_cleanup_funcs(self):
190
raise ErrorA('Error A')
192
raise ErrorB('Error B')
193
return [(raise_a, (), {}), (raise_b, (), {})]
195
def test_multiple_cleanup_failures_debug_flag(self):
197
trace.push_log_file(log)
198
debug.debug_flags.add('cleanup')
199
cleanups = self.make_two_failing_cleanup_funcs()
200
self.assertRaises(ErrorA, _do_with_cleanups, cleanups,
202
self.assertContainsRe(
203
log.getvalue(), "bzr: warning: Cleanup failed:.*Error B\n")
204
self.assertEqual(1, log.getvalue().count('bzr: warning:'),
207
def test_func_and_cleanup_errors_debug_flag(self):
209
trace.push_log_file(log)
210
debug.debug_flags.add('cleanup')
211
cleanups = self.make_two_failing_cleanup_funcs()
212
self.assertRaises(ZeroDivisionError, _do_with_cleanups, cleanups,
214
self.assertContainsRe(
215
log.getvalue(), "bzr: warning: Cleanup failed:.*Error A\n")
216
self.assertContainsRe(
217
log.getvalue(), "bzr: warning: Cleanup failed:.*Error B\n")
218
self.assertEqual(2, log.getvalue().count('bzr: warning:'))
220
def test_func_may_mutate_cleanups(self):
221
"""The main func may mutate the cleanups before it returns.
223
This allows a function to gradually add cleanups as it acquires
224
resources, rather than planning all the cleanups up-front. The
225
OperationWithCleanups helper relies on this working.
228
def func_that_adds_cleanups():
229
self.call_log.append('func_that_adds_cleanups')
230
cleanups_list.append((self.no_op_cleanup, (), {}))
232
result = _do_with_cleanups(cleanups_list, func_that_adds_cleanups)
233
self.assertEqual('result', result)
235
['func_that_adds_cleanups', 'no_op_cleanup'], self.call_log)
237
def test_cleanup_error_debug_flag(self):
238
"""The -Dcleanup debug flag causes cleanup errors to be reported to the
242
trace.push_log_file(log)
243
debug.debug_flags.add('cleanup')
244
self.assertRaises(ZeroDivisionError, _do_with_cleanups,
245
[(self.failing_cleanup, (), {})], self.failing_func)
246
self.assertContainsRe(
248
"bzr: warning: Cleanup failed:.*failing_cleanup goes boom")
249
self.assertEqual(1, log.getvalue().count('bzr: warning:'))
252
class ErrorA(Exception): pass
253
class ErrorB(Exception): pass
256
class TestOperationWithCleanups(CleanupsTestCase):
258
def test_cleanup_ordering(self):
259
"""Cleanups are added in LIFO order.
261
So cleanups added before run is called are run last, and the last
262
cleanup added during the func is run first.
266
call_log.append(('func called', foo))
267
op.add_cleanup(call_log.append, 'cleanup 2')
268
op.add_cleanup(call_log.append, 'cleanup 1')
270
owc = OperationWithCleanups(func)
271
owc.add_cleanup(call_log.append, 'cleanup 4')
272
owc.add_cleanup(call_log.append, 'cleanup 3')
273
result = owc.run('foo')
274
self.assertEqual('result', result)
276
[('func called', 'foo'), 'cleanup 1', 'cleanup 2', 'cleanup 3',
277
'cleanup 4'], call_log)