1
# Copyright (C) 2011, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25
class TestCatchingExceptionThread(tests.TestCase):
27
def test_start_and_join_smoke_test(self):
31
tt = cethread.CatchingExceptionThread(target=do_nothing)
35
def test_exception_is_re_raised(self):
36
class MyException(Exception):
39
def raise_my_exception():
42
tt = cethread.CatchingExceptionThread(target=raise_my_exception)
44
self.assertRaises(MyException, tt.join)
46
def test_join_around_exception(self):
47
resume = threading.Event()
49
class MyException(Exception):
52
def raise_my_exception():
53
# Wait for the test to tell us to resume
58
tt = cethread.CatchingExceptionThread(target=raise_my_exception)
61
self.assertIs(None, tt.exception)
63
self.assertRaises(MyException, tt.join)
65
def test_sync_event(self):
66
control = threading.Event()
67
in_thread = threading.Event()
69
class MyException(Exception):
72
def raise_my_exception():
73
# Wait for the test to tell us to resume
78
tt = cethread.CatchingExceptionThread(target=raise_my_exception,
82
self.assertIs(None, tt.exception)
83
self.assertIs(in_thread, tt.sync_event)
85
self.assertRaises(MyException, tt.join)
86
self.assertEqual(True, tt.sync_event.isSet())
88
def test_switch_and_set(self):
89
"""Caller can precisely control a thread."""
90
control1 = threading.Event()
91
control2 = threading.Event()
92
control3 = threading.Event()
94
class TestThread(cethread.CatchingExceptionThread):
97
super(TestThread, self).__init__(target=self.step_by_step)
98
self.current_step = 'starting'
99
self.step1 = threading.Event()
100
self.set_sync_event(self.step1)
101
self.step2 = threading.Event()
102
self.final = threading.Event()
104
def step_by_step(self):
106
self.current_step = 'step1'
107
self.switch_and_set(self.step2)
109
self.current_step = 'step2'
110
self.switch_and_set(self.final)
112
self.current_step = 'done'
116
self.assertEqual('starting', tt.current_step)
119
self.assertEqual('step1', tt.current_step)
122
self.assertEqual('step2', tt.current_step)
124
# We don't wait on tt.final
126
self.assertEqual('done', tt.current_step)
128
def test_exception_while_switch_and_set(self):
129
control1 = threading.Event()
131
class MyException(Exception):
134
class TestThread(cethread.CatchingExceptionThread):
136
def __init__(self, *args, **kwargs):
137
self.step1 = threading.Event()
138
self.step2 = threading.Event()
139
super(TestThread, self).__init__(target=self.step_by_step,
140
sync_event=self.step1)
141
self.current_step = 'starting'
142
self.set_sync_event(self.step1)
144
def step_by_step(self):
146
self.current_step = 'step1'
147
self.switch_and_set(self.step2)
149
def set_sync_event(self, event):
150
# We force an exception while trying to set step2
151
if event is self.step2:
153
super(TestThread, self).set_sync_event(event)
157
self.assertEqual('starting', tt.current_step)
159
# We now wait on step1 which will be set when catching the exception
161
self.assertRaises(MyException, tt.pending_exception)
162
self.assertIs(tt.step1, tt.sync_event)
163
self.assertTrue(tt.step1.isSet())