1
# Copyright 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
17
"""Code to do some load testing against a loggerhead instance.
19
This is basically meant to take a list of actions to take, and run it against a
20
real host, and see how the results respond.::
23
"base_url": "http://localhost:8080",
26
{"thread": "1", "relpath": "/changes"},
27
{"thread": "1", "relpath": "/changes"},
28
{"thread": "1", "relpath": "/changes"},
29
{"thread": "1", "relpath": "/changes"}
33
All threads have a Queue length of 1. When a third request for a given thread
34
is seen, no more requests are queued until that thread finishes its current
35
job. So this results in all requests being issued sequentially::
37
{"thread": "1", "relpath": "/changes"},
38
{"thread": "1", "relpath": "/changes"},
39
{"thread": "1", "relpath": "/changes"},
40
{"thread": "1", "relpath": "/changes"}
42
While this would cause all requests to be sent in parallel:
44
{"thread": "1", "relpath": "/changes"},
45
{"thread": "2", "relpath": "/changes"},
46
{"thread": "3", "relpath": "/changes"},
47
{"thread": "4", "relpath": "/changes"}
49
This should keep 2 threads pipelined with activity, as long as they finish in
50
approximately the same speed. We'll start the first thread running, and the
51
second thread, and queue up both with a second request once the first finishes.
52
When we get to the third request for thread "1", we block on queuing up more
53
work until the first thread 1 request has finished.
54
{"thread": "1", "relpath": "/changes"},
55
{"thread": "2", "relpath": "/changes"},
56
{"thread": "1", "relpath": "/changes"},
57
{"thread": "2", "relpath": "/changes"},
58
{"thread": "1", "relpath": "/changes"},
59
{"thread": "2", "relpath": "/changes"}
61
There is not currently a way to say "run all these requests keeping exactly 2
62
threads active". Though if you know the load pattern, you could approximate
69
from queue import Queue, Empty
70
except ImportError: # Python < 3
71
from Queue import Queue, Empty
80
from breezy.sixish import viewvalues
82
# This code will be doing multi-threaded requests against breezy.transport
83
# code. We want to make sure to load everything ahead of time, so we don't get
84
# lazy-import failures
85
_ = transport.get_transport('http://example.com')
88
class RequestDescription(object):
89
"""Describes info about a request."""
91
def __init__(self, descrip_dict):
92
self.thread = descrip_dict.get('thread', '1')
93
self.relpath = descrip_dict['relpath']
96
class RequestWorker(object):
97
"""Process requests in a worker thread."""
101
def __init__(self, identifier, blocking_time=1.0, _queue_size=1):
102
self.identifier = identifier
103
self.queue = Queue(_queue_size)
104
self.start_time = self.end_time = None
106
self.blocking_time = blocking_time
109
url = self.queue.get(True, self.blocking_time)
111
# This is usually an indicator that we want to stop, so just skip
113
self.queue.task_done()
115
self.start_time = self._timer()
116
success = self.process(url)
117
self.end_time = self._timer()
118
self.update_stats(url, success)
119
self.queue.task_done()
121
def run(self, stop_event):
122
while not stop_event.isSet():
128
def process(self, url):
129
base, path = urlutils.split(url)
130
t = transport.get_transport(base)
132
# TODO: We should probably look into using some part of
133
# blocking_timeout to decide when to stop trying to read
135
content = t.get_bytes(path)
136
except (errors.TransportError, errors.NoSuchFile):
140
def update_stats(self, url, success):
141
self.stats.append((url, success, self.end_time - self.start_time))
144
class ActionScript(object):
145
"""This tracks the actions that we want to perform."""
147
_worker_class = RequestWorker
148
_default_base_url = 'http://localhost:8080'
149
_default_blocking_timeout = 60.0
152
self.base_url = self._default_base_url
153
self.blocking_timeout = self._default_blocking_timeout
156
self.stop_event = threading.Event()
159
def parse(cls, content):
161
json_dict = simplejson.loads(content)
162
if 'parameters' not in json_dict:
163
raise ValueError('Missing "parameters" section')
164
if 'requests' not in json_dict:
165
raise ValueError('Missing "requests" section')
166
param_dict = json_dict['parameters']
167
request_list = json_dict['requests']
168
base_url = param_dict.get('base_url', None)
169
if base_url is not None:
170
script.base_url = base_url
171
blocking_timeout = param_dict.get('blocking_timeout', None)
172
if blocking_timeout is not None:
173
script.blocking_timeout = blocking_timeout
174
for request_dict in request_list:
175
script.add_request(request_dict)
178
def add_request(self, request_dict):
179
request = RequestDescription(request_dict)
180
self._requests.append(request)
182
def _get_worker(self, thread_id):
183
if thread_id in self._threads:
184
return self._threads[thread_id][0]
185
handler = self._worker_class(thread_id,
186
blocking_time=self.blocking_timeout/10.0)
188
t = threading.Thread(target=handler.run, args=(self.stop_event,),
189
name='Thread-%s' % (thread_id,))
190
self._threads[thread_id] = (handler, t)
194
def finish_queues(self):
195
"""Wait for all queues of all children to finish."""
196
for h, t in viewvalues(self._threads):
199
def stop_and_join(self):
200
"""Stop all running workers, and return.
202
This will stop even if workers still have work items.
204
self.stop_event.set()
205
for h, t in viewvalues(self._threads):
206
# Signal the queue that it should stop blocking, we don't have to
207
# wait for the queue to empty, because we may see stop_event before
209
h.queue.put('<noop>')
210
# And join the controlling thread
212
t.join(self.blocking_timeout / 10.0)
216
def _full_url(self, relpath):
217
return self.base_url + relpath
220
self.stop_event.clear()
221
for request in self._requests:
222
full_url = self._full_url(request.relpath)
223
worker = self._get_worker(request.thread)
224
worker.queue.put(full_url, True, self.blocking_timeout)
229
def run_script(filename):
230
with open(filename, 'rb') as f:
232
script = ActionScript.parse(content)