threadpool.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """
  16. A threadpool to process incomming messages over MPI with a fixed number of
  17. (already running) threads.
  18. Based on threadpool implementation found at http://stackoverflow.com/
  19. questions/3033952/python-thread-pool-similar-to-the-multiprocessing-pool
  20. """
  21. try:
  22. import Queue as queue
  23. except ImportError:
  24. import queue
  25. from threading import Thread
  26. class Worker(Thread):
  27. """Thread executing tasks from a given tasks queue"""
  28. def __init__(self, tasks):
  29. """
  30. Constructor
  31. :param tasks: queue containing tasks to execute
  32. """
  33. Thread.__init__(self)
  34. self.tasks = tasks
  35. self.daemon = True
  36. self.start()
  37. def run(self):
  38. """
  39. Run the worker thread
  40. """
  41. while 1:
  42. func, args, kargs = self.tasks.get()
  43. try:
  44. func(*args, **kargs)
  45. except Exception as e:
  46. print(e)
  47. finally:
  48. self.tasks.task_done()
  49. class ThreadPool(object):
  50. """Pool of threads consuming tasks from a queue"""
  51. def __init__(self, num_threads):
  52. """
  53. Constructor
  54. :param num_threads: number of threads to start
  55. """
  56. self.tasks = queue.Queue()
  57. self.num_threads = num_threads
  58. for _ in range(num_threads):
  59. Worker(self.tasks)
  60. def __setstate__(self, state):
  61. """
  62. For pickling
  63. """
  64. # Obj will be empty, accept it though
  65. self.__init__(state)
  66. def __getstate__(self):
  67. """
  68. For pickling
  69. """
  70. # A queue is unpicklable...
  71. return self.num_threads
  72. def addTask(self, func, *args, **kwargs):
  73. """
  74. Add a task to the queue
  75. :param func: function to execute
  76. """
  77. self.tasks.put((func, args, kwargs))