Module threadpool
[frames] | no frames]

Source Code for Module threadpool

  1  # -*- coding: UTF-8 -*- 
  2  """Easy to use object-oriented thread pool framework. 
  3   
  4  A thread pool is an object that maintains a pool of worker threads to perform 
  5  time consuming operations in parallel. It assigns jobs to the threads 
  6  by putting them in a work request queue, where they are picked up by the 
  7  next available thread. This then performs the requested operation in the 
  8  background and puts the results in another queue. 
  9   
 10  The thread pool object can then collect the results from all threads from 
 11  this queue as soon as they become available or after all threads have 
 12  finished their work. It's also possible, to define callbacks to handle 
 13  each result as it comes in. 
 14   
 15  The basic concept and some code was taken from the book "Python in a Nutshell, 
 16  2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section 
 17  14.5 "Threaded Program Architecture". I wrapped the main program logic in the 
 18  ThreadPool class, added the WorkRequest class and the callback system and 
 19  tweaked the code here and there. Kudos also to Florent Aide for the exception 
 20  handling mechanism. 
 21   
 22  Basic usage:: 
 23   
 24      >>> pool = ThreadPool(poolsize) 
 25      >>> requests = makeRequests(some_callable, list_of_args, callback) 
 26      >>> [pool.putRequest(req) for req in requests] 
 27      >>> pool.wait() 
 28   
 29  See the end of the module code for a brief, annotated usage example. 
 30   
 31  Website : http://chrisarndt.de/projects/threadpool/ 
 32   
 33  """ 
 34  __docformat__ = "restructuredtext en" 
 35   
 36  __all__ = [ 
 37      'makeRequests', 
 38      'NoResultsPending', 
 39      'NoWorkersAvailable', 
 40      'ThreadPool', 
 41      'WorkRequest', 
 42      'WorkerThread' 
 43  ] 
 44   
 45  __author__ = "Christopher Arndt" 
 46  __version__ = '1.2.7' 
 47  __revision__ = "$Revision: 416 $" 
 48  __date__ = "$Date: 2009-10-07 05:41:27 +0200 (Wed, 07 Oct 2009) $" 
 49  __license__ = "MIT license" 
 50   
 51   
 52  # standard library modules 
 53  import sys 
 54  import threading 
 55  import Queue 
 56  import traceback 
 57   
 58   
 59  # exceptions 
60 -class NoResultsPending(Exception):
61 """All work requests have been processed.""" 62 pass
63
64 -class NoWorkersAvailable(Exception):
65 """No worker threads available to process remaining requests.""" 66 pass
67 68 69 # internal module helper functions
70 -def _handle_thread_exception(request, exc_info):
71 """Default exception handler callback function. 72 73 This just prints the exception info via ``traceback.print_exception``. 74 75 """ 76 traceback.print_exception(*exc_info)
77 78 79 # utility functions
80 -def makeRequests(callable_, args_list, callback=None, 81 exc_callback=_handle_thread_exception):
82 """Create several work requests for same callable with different arguments. 83 84 Convenience function for creating several work requests for the same 85 callable where each invocation of the callable receives different values 86 for its arguments. 87 88 ``args_list`` contains the parameters for each invocation of callable. 89 Each item in ``args_list`` should be either a 2-item tuple of the list of 90 positional arguments and a dictionary of keyword arguments or a single, 91 non-tuple argument. 92 93 See docstring for ``WorkRequest`` for info on ``callback`` and 94 ``exc_callback``. 95 96 """ 97 requests = [] 98 for item in args_list: 99 if isinstance(item, tuple): 100 requests.append( 101 WorkRequest(callable_, item[0], item[1], callback=callback, 102 exc_callback=exc_callback) 103 ) 104 else: 105 requests.append( 106 WorkRequest(callable_, [item], None, callback=callback, 107 exc_callback=exc_callback) 108 ) 109 return requests
110 111 112 # classes
113 -class WorkerThread(threading.Thread):
114 """Background thread connected to the requests/results queues. 115 116 A worker thread sits in the background and picks up work requests from 117 one queue and puts the results in another until it is dismissed. 118 119 """ 120
121 - def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
122 """Set up thread in daemonic mode and start it immediatedly. 123 124 ``requests_queue`` and ``results_queue`` are instances of 125 ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new 126 worker thread. 127 128 """ 129 threading.Thread.__init__(self, **kwds) 130 self.setDaemon(1) 131 self._requests_queue = requests_queue 132 self._results_queue = results_queue 133 self._poll_timeout = poll_timeout 134 self._dismissed = threading.Event() 135 self.start()
136
137 - def run(self):
138 """Repeatedly process the job queue until told to exit.""" 139 while True: 140 if self._dismissed.isSet(): 141 # we are dismissed, break out of loop 142 break 143 # get next work request. If we don't get a new request from the 144 # queue after self._poll_timout seconds, we jump to the start of 145 # the while loop again, to give the thread a chance to exit. 146 try: 147 request = self._requests_queue.get(True, self._poll_timeout) 148 except Queue.Empty: 149 continue 150 else: 151 if self._dismissed.isSet(): 152 # we are dismissed, put back request in queue and exit loop 153 self._requests_queue.put(request) 154 break 155 try: 156 result = request.callable(*request.args, **request.kwds) 157 self._results_queue.put((request, result)) 158 except: 159 request.exception = True 160 self._results_queue.put((request, sys.exc_info()))
161
162 - def dismiss(self):
163 """Sets a flag to tell the thread to exit when done with current job.""" 164 self._dismissed.set()
165 166
167 -class WorkRequest:
168 """A request to execute a callable for putting in the request queue later. 169 170 See the module function ``makeRequests`` for the common case 171 where you want to build several ``WorkRequest`` objects for the same 172 callable but with different arguments for each call. 173 174 """ 175
176 - def __init__(self, callable_, args=None, kwds=None, requestID=None, 177 callback=None, exc_callback=_handle_thread_exception):
178 """Create a work request for a callable and attach callbacks. 179 180 A work request consists of the a callable to be executed by a 181 worker thread, a list of positional arguments, a dictionary 182 of keyword arguments. 183 184 A ``callback`` function can be specified, that is called when the 185 results of the request are picked up from the result queue. It must 186 accept two anonymous arguments, the ``WorkRequest`` object and the 187 results of the callable, in that order. If you want to pass additional 188 information to the callback, just stick it on the request object. 189 190 You can also give custom callback for when an exception occurs with 191 the ``exc_callback`` keyword parameter. It should also accept two 192 anonymous arguments, the ``WorkRequest`` and a tuple with the exception 193 details as returned by ``sys.exc_info()``. The default implementation 194 of this callback just prints the exception info via 195 ``traceback.print_exception``. If you want no exception handler 196 callback, just pass in ``None``. 197 198 ``requestID``, if given, must be hashable since it is used by 199 ``ThreadPool`` object to store the results of that work request in a 200 dictionary. It defaults to the return value of ``id(self)``. 201 202 """ 203 if requestID is None: 204 self.requestID = id(self) 205 else: 206 try: 207 self.requestID = hash(requestID) 208 except TypeError: 209 raise TypeError("requestID must be hashable.") 210 self.exception = False 211 self.callback = callback 212 self.exc_callback = exc_callback 213 self.callable = callable_ 214 self.args = args or [] 215 self.kwds = kwds or {}
216
217 - def __str__(self):
218 return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ 219 (self.requestID, self.args, self.kwds, self.exception)
220
221 -class ThreadPool:
222 """A thread pool, distributing work requests and collecting results. 223 224 See the module docstring for more information. 225 226 """ 227
228 - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
229 """Set up the thread pool and start num_workers worker threads. 230 231 ``num_workers`` is the number of worker threads to start initially. 232 233 If ``q_size > 0`` the size of the work *request queue* is limited and 234 the thread pool blocks when the queue is full and it tries to put 235 more work requests in it (see ``putRequest`` method), unless you also 236 use a positive ``timeout`` value for ``putRequest``. 237 238 If ``resq_size > 0`` the size of the *results queue* is limited and the 239 worker threads will block when the queue is full and they try to put 240 new results in it. 241 242 .. warning: 243 If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is 244 the possibilty of a deadlock, when the results queue is not pulled 245 regularly and too many jobs are put in the work requests queue. 246 To prevent this, always set ``timeout > 0`` when calling 247 ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. 248 249 """ 250 self._requests_queue = Queue.Queue(q_size) 251 self._results_queue = Queue.Queue(resq_size) 252 self.workers = [] 253 self.dismissedWorkers = [] 254 self.workRequests = {} 255 self.createWorkers(num_workers, poll_timeout)
256
257 - def createWorkers(self, num_workers, poll_timeout=5):
258 """Add num_workers worker threads to the pool. 259 260 ``poll_timout`` sets the interval in seconds (int or float) for how 261 ofte threads should check whether they are dismissed, while waiting for 262 requests. 263 264 """ 265 for i in range(num_workers): 266 self.workers.append(WorkerThread(self._requests_queue, 267 self._results_queue, poll_timeout=poll_timeout))
268
269 - def dismissWorkers(self, num_workers, do_join=False):
270 """Tell num_workers worker threads to quit after their current task.""" 271 dismiss_list = [] 272 for i in range(min(num_workers, len(self.workers))): 273 worker = self.workers.pop() 274 worker.dismiss() 275 dismiss_list.append(worker) 276 277 if do_join: 278 for worker in dismiss_list: 279 worker.join() 280 else: 281 self.dismissedWorkers.extend(dismiss_list)
282
283 - def joinAllDismissedWorkers(self):
284 """Perform Thread.join() on all worker threads that have been dismissed. 285 """ 286 for worker in self.dismissedWorkers: 287 worker.join() 288 self.dismissedWorkers = []
289
290 - def putRequest(self, request, block=True, timeout=None):
291 """Put work request into work queue and save its id for later.""" 292 assert isinstance(request, WorkRequest) 293 # don't reuse old work requests 294 assert not getattr(request, 'exception', None) 295 self._requests_queue.put(request, block, timeout) 296 self.workRequests[request.requestID] = request
297
298 - def poll(self, block=False):
299 """Process any new results in the queue.""" 300 while True: 301 # still results pending? 302 if not self.workRequests: 303 raise NoResultsPending 304 # are there still workers to process remaining requests? 305 elif block and not self.workers: 306 raise NoWorkersAvailable 307 try: 308 # get back next results 309 request, result = self._results_queue.get(block=block) 310 # has an exception occured? 311 if request.exception and request.exc_callback: 312 request.exc_callback(request, result) 313 # hand results to callback, if any 314 if request.callback and not \ 315 (request.exception and request.exc_callback): 316 request.callback(request, result) 317 del self.workRequests[request.requestID] 318 except Queue.Empty: 319 break
320
321 - def wait(self):
322 """Wait for results, blocking until all have arrived.""" 323 while 1: 324 try: 325 self.poll(True) 326 except NoResultsPending: 327 break
328 329 330 ################ 331 # USAGE EXAMPLE 332 ################ 333 334 if __name__ == '__main__': 335 import random 336 import time 337 338 # the work the threads will have to do (rather trivial in our example)
339 - def do_something(data):
340 time.sleep(random.randint(1,5)) 341 result = round(random.random() * data, 5) 342 # just to show off, we throw an exception once in a while 343 if result > 5: 344 raise RuntimeError("Something extraordinary happened!") 345 return result
346 347 # this will be called each time a result is available 350 351 # this will be called when an exception occurs within a thread 352 # this example exception handler does little more than the default handler
353 - def handle_exception(request, exc_info):
354 if not isinstance(exc_info, tuple): 355 # Something is seriously wrong... 356 print request 357 print exc_info 358 raise SystemExit 359 print "**** Exception occured in request #%s: %s" % \ 360 (request.requestID, exc_info)
361 362 # assemble the arguments for each job to a list... 363 data = [random.randint(1,10) for i in range(20)] 364 # ... and build a WorkRequest object for each item in data 365 requests = makeRequests(do_something, data, print_result, handle_exception) 366 # to use the default exception handler, uncomment next line and comment out 367 # the preceding one. 368 #requests = makeRequests(do_something, data, print_result) 369 370 # or the other form of args_lists accepted by makeRequests: ((,), {}) 371 data = [((random.randint(1,10),), {}) for i in range(20)] 372 requests.extend( 373 makeRequests(do_something, data, print_result, handle_exception) 374 #makeRequests(do_something, data, print_result) 375 # to use the default exception handler, uncomment next line and comment 376 # out the preceding one. 377 ) 378 379 # we create a pool of 3 worker threads 380 print "Creating thread pool with 3 worker threads." 381 main = ThreadPool(3) 382 383 # then we put the work requests in the queue... 384 for req in requests: 385 main.putRequest(req) 386 print "Work request #%s added." % req.requestID 387 # or shorter: 388 # [main.putRequest(req) for req in requests] 389 390 # ...and wait for the results to arrive in the result queue 391 # by using ThreadPool.wait(). This would block until results for 392 # all work requests have arrived: 393 # main.wait() 394 395 # instead we can poll for results while doing something else: 396 i = 0 397 while True: 398 try: 399 time.sleep(0.5) 400 main.poll() 401 print "Main thread working...", 402 print "(active worker threads: %i)" % (threading.activeCount()-1, ) 403 if i == 10: 404 print "**** Adding 3 more worker threads..." 405 main.createWorkers(3) 406 if i == 20: 407 print "**** Dismissing 2 worker threads..." 408 main.dismissWorkers(2) 409 i += 1 410 except KeyboardInterrupt: 411 print "**** Interrupted!" 412 break 413 except NoResultsPending: 414 print "**** No pending results." 415 break 416 if main.dismissedWorkers: 417 print "Joining all dismissed worker threads..." 418 main.joinAllDismissedWorkers() 419