/loggerhead/trunk

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/loggerhead/trunk

« back to all changes in this revision

Viewing changes to loggerhead/load_test.py

  • Committer: Jelmer Vernooij
  • Date: 2018-10-20 17:34:46 UTC
  • mto: (491.6.1 breezy)
  • mto: This revision was merged to the branch mainline in revision 494.
  • Revision ID: jelmer@jelmer.uk-20181020173446-eywejowpq25sg8pw
Rename serve-branches to loggerhead-serve.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Suite 500, Boston, MA 02110-1335  USA
 
16
 
 
17
"""Code to do some load testing against a loggerhead instance.
 
18
 
 
19
This is basically meant to take a list of actions to take, and run it against a
 
20
real host, and see how the results respond.::
 
21
 
 
22
    {"parameters": {
 
23
         "base_url": "http://localhost:8080",
 
24
     },
 
25
     "requests": [
 
26
        {"thread": "1", "relpath": "/changes"},
 
27
        {"thread": "1", "relpath": "/changes"},
 
28
        {"thread": "1", "relpath": "/changes"},
 
29
        {"thread": "1", "relpath": "/changes"}
 
30
     ],
 
31
    }
 
32
 
 
33
All threads have a Queue length of 1. When a third request for a given thread
 
34
is seen, no more requests are queued until that thread finishes its current
 
35
job. So this results in all requests being issued sequentially::
 
36
 
 
37
        {"thread": "1", "relpath": "/changes"},
 
38
        {"thread": "1", "relpath": "/changes"},
 
39
        {"thread": "1", "relpath": "/changes"},
 
40
        {"thread": "1", "relpath": "/changes"}
 
41
 
 
42
While this would cause all requests to be sent in parallel:
 
43
 
 
44
        {"thread": "1", "relpath": "/changes"},
 
45
        {"thread": "2", "relpath": "/changes"},
 
46
        {"thread": "3", "relpath": "/changes"},
 
47
        {"thread": "4", "relpath": "/changes"}
 
48
 
 
49
This should keep 2 threads pipelined with activity, as long as they finish in
 
50
approximately the same speed. We'll start the first thread running, and the
 
51
second thread, and queue up both with a second request once the first finishes.
 
52
When we get to the third request for thread "1", we block on queuing up more
 
53
work until the first thread 1 request has finished.
 
54
        {"thread": "1", "relpath": "/changes"},
 
55
        {"thread": "2", "relpath": "/changes"},
 
56
        {"thread": "1", "relpath": "/changes"},
 
57
        {"thread": "2", "relpath": "/changes"},
 
58
        {"thread": "1", "relpath": "/changes"},
 
59
        {"thread": "2", "relpath": "/changes"}
 
60
 
 
61
There is not currently a way to say "run all these requests keeping exactly 2
 
62
threads active". Though if you know the load pattern, you could approximate
 
63
this.
 
64
"""
 
65
 
 
66
import threading
 
67
import time
 
68
try:
 
69
    from queue import Queue, Empty
 
70
except ImportError:  # Python < 3
 
71
    from Queue import Queue, Empty
 
72
 
 
73
import simplejson
 
74
 
 
75
from breezy import (
 
76
    errors,
 
77
    transport,
 
78
    urlutils,
 
79
    )
 
80
from breezy.sixish import viewvalues
 
81
 
 
82
# This code will be doing multi-threaded requests against breezy.transport
 
83
# code. We want to make sure to load everything ahead of time, so we don't get
 
84
# lazy-import failures
 
85
_ = transport.get_transport('http://example.com')
 
86
 
 
87
 
 
88
class RequestDescription(object):
 
89
    """Describes info about a request."""
 
90
 
 
91
    def __init__(self, descrip_dict):
 
92
        self.thread = descrip_dict.get('thread', '1')
 
93
        self.relpath = descrip_dict['relpath']
 
94
 
 
95
 
 
96
class RequestWorker(object):
 
97
    """Process requests in a worker thread."""
 
98
 
 
99
    _timer = time.time
 
100
 
 
101
    def __init__(self, identifier, blocking_time=1.0, _queue_size=1):
 
102
        self.identifier = identifier
 
103
        self.queue = Queue(_queue_size)
 
104
        self.start_time = self.end_time = None
 
105
        self.stats = []
 
106
        self.blocking_time = blocking_time
 
107
 
 
108
    def step_next(self):
 
109
        url = self.queue.get(True, self.blocking_time)
 
110
        if url == '<noop>':
 
111
            # This is usually an indicator that we want to stop, so just skip
 
112
            # this one.
 
113
            self.queue.task_done()
 
114
            return
 
115
        self.start_time = self._timer()
 
116
        success = self.process(url)
 
117
        self.end_time = self._timer()
 
118
        self.update_stats(url, success)
 
119
        self.queue.task_done()
 
120
 
 
121
    def run(self, stop_event):
 
122
        while not stop_event.isSet():
 
123
            try:
 
124
                self.step_next()
 
125
            except Empty:
 
126
                pass
 
127
 
 
128
    def process(self, url):
 
129
        base, path = urlutils.split(url)
 
130
        t = transport.get_transport(base)
 
131
        try:
 
132
            # TODO: We should probably look into using some part of
 
133
            #       blocking_timeout to decide when to stop trying to read
 
134
            #       content
 
135
            content = t.get_bytes(path)
 
136
        except (errors.TransportError, errors.NoSuchFile):
 
137
            return False
 
138
        return True
 
139
 
 
140
    def update_stats(self, url, success):
 
141
        self.stats.append((url, success, self.end_time - self.start_time))
 
142
 
 
143
 
 
144
class ActionScript(object):
 
145
    """This tracks the actions that we want to perform."""
 
146
 
 
147
    _worker_class = RequestWorker
 
148
    _default_base_url = 'http://localhost:8080'
 
149
    _default_blocking_timeout = 60.0
 
150
 
 
151
    def __init__(self):
 
152
        self.base_url = self._default_base_url
 
153
        self.blocking_timeout = self._default_blocking_timeout
 
154
        self._requests = []
 
155
        self._threads = {}
 
156
        self.stop_event = threading.Event()
 
157
 
 
158
    @classmethod
 
159
    def parse(cls, content):
 
160
        script = cls()
 
161
        json_dict = simplejson.loads(content)
 
162
        if 'parameters' not in json_dict:
 
163
            raise ValueError('Missing "parameters" section')
 
164
        if 'requests' not in json_dict:
 
165
            raise ValueError('Missing "requests" section')
 
166
        param_dict = json_dict['parameters']
 
167
        request_list = json_dict['requests']
 
168
        base_url = param_dict.get('base_url', None)
 
169
        if base_url is not None:
 
170
            script.base_url = base_url
 
171
        blocking_timeout = param_dict.get('blocking_timeout', None)
 
172
        if blocking_timeout is not None:
 
173
            script.blocking_timeout = blocking_timeout
 
174
        for request_dict in request_list:
 
175
            script.add_request(request_dict)
 
176
        return script
 
177
 
 
178
    def add_request(self, request_dict):
 
179
        request = RequestDescription(request_dict)
 
180
        self._requests.append(request)
 
181
 
 
182
    def _get_worker(self, thread_id):
 
183
        if thread_id in self._threads:
 
184
            return self._threads[thread_id][0]
 
185
        handler = self._worker_class(thread_id,
 
186
                                     blocking_time=self.blocking_timeout/10.0)
 
187
 
 
188
        t = threading.Thread(target=handler.run, args=(self.stop_event,),
 
189
                             name='Thread-%s' % (thread_id,))
 
190
        self._threads[thread_id] = (handler, t)
 
191
        t.start()
 
192
        return handler
 
193
 
 
194
    def finish_queues(self):
 
195
        """Wait for all queues of all children to finish."""
 
196
        for h, t in viewvalues(self._threads):
 
197
            h.queue.join()
 
198
 
 
199
    def stop_and_join(self):
 
200
        """Stop all running workers, and return.
 
201
 
 
202
        This will stop even if workers still have work items.
 
203
        """
 
204
        self.stop_event.set()
 
205
        for h, t in viewvalues(self._threads):
 
206
            # Signal the queue that it should stop blocking, we don't have to
 
207
            # wait for the queue to empty, because we may see stop_event before
 
208
            # we see the <noop>
 
209
            h.queue.put('<noop>')
 
210
            # And join the controlling thread
 
211
            for i in range(10):
 
212
                t.join(self.blocking_timeout / 10.0)
 
213
                if not t.isAlive():
 
214
                    break
 
215
 
 
216
    def _full_url(self, relpath):
 
217
        return self.base_url + relpath
 
218
 
 
219
    def run(self):
 
220
        self.stop_event.clear()
 
221
        for request in self._requests:
 
222
            full_url = self._full_url(request.relpath)
 
223
            worker = self._get_worker(request.thread)
 
224
            worker.queue.put(full_url, True, self.blocking_timeout)
 
225
        self.finish_queues()
 
226
        self.stop_and_join()
 
227
 
 
228
 
 
229
def run_script(filename):
 
230
    with open(filename, 'rb') as f:
 
231
        content = f.read()
 
232
    script = ActionScript.parse(content)
 
233
    script.run()
 
234
    return script