Source code for pypdevs.threadpool
# Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
# McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A threadpool to process incomming messages over MPI with a fixed number of
(already running) threads.
Based on threadpool implementation found at http://stackoverflow.com/
questions/3033952/python-thread-pool-similar-to-the-multiprocessing-pool
"""
try:
import Queue as queue
except ImportError:
import queue
from threading import Thread
[docs]class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
[docs] def __init__(self, tasks):
"""
Constructor
:param tasks: queue containing tasks to execute
"""
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
[docs] def run(self):
"""
Run the worker thread
"""
while 1:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as e:
print(e)
finally:
self.tasks.task_done()
[docs]class ThreadPool(object):
"""Pool of threads consuming tasks from a queue"""
[docs] def __init__(self, num_threads):
"""
Constructor
:param num_threads: number of threads to start
"""
self.tasks = queue.Queue()
self.num_threads = num_threads
for _ in range(num_threads):
Worker(self.tasks)
def __setstate__(self, state):
"""
For pickling
"""
# Obj will be empty, accept it though
self.__init__(state)
def __getstate__(self):
"""
For pickling
"""
# A queue is unpicklable...
return self.num_threads
[docs] def addTask(self, func, *args, **kwargs):
"""
Add a task to the queue
:param func: function to execute
"""
self.tasks.put((func, args, kwargs))