/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
6614.1.3 by Vincent Ladeuil
Fix assertEquals being deprecated by using assertEqual.
1
# Copyright (C) 2011, 2016 Canonical Ltd
5652.1.1 by Vincent Ladeuil
Split ThreadWithException out of the tests hierarchy.
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17
import threading
18
6624 by Jelmer Vernooij
Merge Python3 porting work ('py3 pokes')
19
from .. import (
5652.1.6 by Vincent Ladeuil
thread is already a python module, avoid confusion and use cethread instead.
20
    cethread,
5652.1.1 by Vincent Ladeuil
Split ThreadWithException out of the tests hierarchy.
21
    tests,
22
    )
23
24
5652.1.2 by Vincent Ladeuil
Use clearer names.
25
class TestCatchingExceptionThread(tests.TestCase):
5652.1.1 by Vincent Ladeuil
Split ThreadWithException out of the tests hierarchy.
26
27
    def test_start_and_join_smoke_test(self):
28
        def do_nothing():
29
            pass
30
5652.1.6 by Vincent Ladeuil
thread is already a python module, avoid confusion and use cethread instead.
31
        tt = cethread.CatchingExceptionThread(target=do_nothing)
5652.1.1 by Vincent Ladeuil
Split ThreadWithException out of the tests hierarchy.
32
        tt.start()
33
        tt.join()
34
35
    def test_exception_is_re_raised(self):
36
        class MyException(Exception):
37
            pass
38
39
        def raise_my_exception():
40
            raise MyException()
41
5652.1.6 by Vincent Ladeuil
thread is already a python module, avoid confusion and use cethread instead.
42
        tt = cethread.CatchingExceptionThread(target=raise_my_exception)
5652.1.1 by Vincent Ladeuil
Split ThreadWithException out of the tests hierarchy.
43
        tt.start()
44
        self.assertRaises(MyException, tt.join)
45
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
46
    def test_join_around_exception(self):
5652.1.1 by Vincent Ladeuil
Split ThreadWithException out of the tests hierarchy.
47
        resume = threading.Event()
7143.15.2 by Jelmer Vernooij
Run autopep8.
48
5652.1.1 by Vincent Ladeuil
Split ThreadWithException out of the tests hierarchy.
49
        class MyException(Exception):
50
            pass
51
52
        def raise_my_exception():
53
            # Wait for the test to tell us to resume
54
            resume.wait()
55
            # Now we can raise
56
            raise MyException()
57
5652.1.6 by Vincent Ladeuil
thread is already a python module, avoid confusion and use cethread instead.
58
        tt = cethread.CatchingExceptionThread(target=raise_my_exception)
5652.1.1 by Vincent Ladeuil
Split ThreadWithException out of the tests hierarchy.
59
        tt.start()
60
        tt.join(timeout=0)
61
        self.assertIs(None, tt.exception)
62
        resume.set()
63
        self.assertRaises(MyException, tt.join)
64
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
65
    def test_sync_event(self):
66
        control = threading.Event()
67
        in_thread = threading.Event()
7143.15.2 by Jelmer Vernooij
Run autopep8.
68
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
69
        class MyException(Exception):
70
            pass
71
72
        def raise_my_exception():
73
            # Wait for the test to tell us to resume
74
            control.wait()
75
            # Now we can raise
76
            raise MyException()
77
5652.1.6 by Vincent Ladeuil
thread is already a python module, avoid confusion and use cethread instead.
78
        tt = cethread.CatchingExceptionThread(target=raise_my_exception,
7143.15.2 by Jelmer Vernooij
Run autopep8.
79
                                              sync_event=in_thread)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
80
        tt.start()
81
        tt.join(timeout=0)
82
        self.assertIs(None, tt.exception)
83
        self.assertIs(in_thread, tt.sync_event)
84
        control.set()
85
        self.assertRaises(MyException, tt.join)
6614.1.3 by Vincent Ladeuil
Fix assertEquals being deprecated by using assertEqual.
86
        self.assertEqual(True, tt.sync_event.isSet())
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
87
5652.1.5 by Vincent Ladeuil
Align method name and implementation.
88
    def test_switch_and_set(self):
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
89
        """Caller can precisely control a thread."""
90
        control1 = threading.Event()
91
        control2 = threading.Event()
92
        control3 = threading.Event()
93
5652.1.6 by Vincent Ladeuil
thread is already a python module, avoid confusion and use cethread instead.
94
        class TestThread(cethread.CatchingExceptionThread):
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
95
5652.1.7 by Vincent Ladeuil
Fix pqm failure on python2.4.
96
            def __init__(self):
97
                super(TestThread, self).__init__(target=self.step_by_step)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
98
                self.current_step = 'starting'
99
                self.step1 = threading.Event()
100
                self.set_sync_event(self.step1)
101
                self.step2 = threading.Event()
102
                self.final = threading.Event()
103
104
            def step_by_step(self):
105
                control1.wait()
106
                self.current_step = 'step1'
5652.1.5 by Vincent Ladeuil
Align method name and implementation.
107
                self.switch_and_set(self.step2)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
108
                control2.wait()
109
                self.current_step = 'step2'
5652.1.5 by Vincent Ladeuil
Align method name and implementation.
110
                self.switch_and_set(self.final)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
111
                control3.wait()
112
                self.current_step = 'done'
113
114
        tt = TestThread()
115
        tt.start()
6614.1.3 by Vincent Ladeuil
Fix assertEquals being deprecated by using assertEqual.
116
        self.assertEqual('starting', tt.current_step)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
117
        control1.set()
118
        tt.step1.wait()
6614.1.3 by Vincent Ladeuil
Fix assertEquals being deprecated by using assertEqual.
119
        self.assertEqual('step1', tt.current_step)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
120
        control2.set()
121
        tt.step2.wait()
6614.1.3 by Vincent Ladeuil
Fix assertEquals being deprecated by using assertEqual.
122
        self.assertEqual('step2', tt.current_step)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
123
        control3.set()
124
        # We don't wait on tt.final
125
        tt.join()
6614.1.3 by Vincent Ladeuil
Fix assertEquals being deprecated by using assertEqual.
126
        self.assertEqual('done', tt.current_step)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
127
5652.1.5 by Vincent Ladeuil
Align method name and implementation.
128
    def test_exception_while_switch_and_set(self):
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
129
        control1 = threading.Event()
130
131
        class MyException(Exception):
132
            pass
133
5652.1.6 by Vincent Ladeuil
thread is already a python module, avoid confusion and use cethread instead.
134
        class TestThread(cethread.CatchingExceptionThread):
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
135
136
            def __init__(self, *args, **kwargs):
137
                self.step1 = threading.Event()
138
                self.step2 = threading.Event()
5652.1.7 by Vincent Ladeuil
Fix pqm failure on python2.4.
139
                super(TestThread, self).__init__(target=self.step_by_step,
140
                                                 sync_event=self.step1)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
141
                self.current_step = 'starting'
142
                self.set_sync_event(self.step1)
143
144
            def step_by_step(self):
145
                control1.wait()
146
                self.current_step = 'step1'
5652.1.5 by Vincent Ladeuil
Align method name and implementation.
147
                self.switch_and_set(self.step2)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
148
149
            def set_sync_event(self, event):
150
                # We force an exception while trying to set step2
151
                if event is self.step2:
152
                    raise MyException()
153
                super(TestThread, self).set_sync_event(event)
154
155
        tt = TestThread()
156
        tt.start()
6614.1.3 by Vincent Ladeuil
Fix assertEquals being deprecated by using assertEqual.
157
        self.assertEqual('starting', tt.current_step)
5652.1.4 by Vincent Ladeuil
Add CatchingExceptionThread.set_and_switch() to avoid race conditions.
158
        control1.set()
159
        # We now wait on step1 which will be set when catching the exception
160
        tt.step1.wait()
161
        self.assertRaises(MyException, tt.pending_exception)
162
        self.assertIs(tt.step1, tt.sync_event)
163
        self.assertTrue(tt.step1.isSet())