MPIRedirect.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """
  16. Class containing a kind of RMI implementation over MPI.
  17. """
  18. oneways = frozenset(["simulate",
  19. "receiveControl",
  20. "receive",
  21. "finishSimulation",
  22. "notifyWait",
  23. "notifyRun",
  24. "prepare",
  25. "receiveAntiMessages",
  26. "migrationUnlock",
  27. "notifyMigration",
  28. "requestMigrationLock",
  29. "setGVT"])
  30. import pypdevs.middleware as middleware
  31. class MPIFaker(object):
  32. """
  33. A dummy implementation of MPI4Py if none is found
  34. """
  35. # Don't follow coding style here, as we need to be compatible with the mpi4py interface
  36. @staticmethod
  37. def Get_size():
  38. """
  39. Return the size of the MPI world. Always 1, since it is only used in cases where local simulation is done.
  40. :returns: int -- number of MPI processes running
  41. """
  42. return 1
  43. @staticmethod
  44. def Get_rank():
  45. """
  46. Return the rank of the current process in the MPI world. Always 0, since it is only used in cases where local simulation is done.
  47. :returns: int -- rank of the current process
  48. """
  49. return 0
  50. try:
  51. from mpi4py import MPI
  52. COMM_WORLD = MPI.COMM_WORLD
  53. except ImportError:
  54. # MPI4Py not found, fall back to the dummy implementation
  55. COMM_WORLD = MPIFaker()
  56. import threading
  57. from pypdevs.logger import *
  58. def cleaning():
  59. """
  60. Clean up the list of all waiting asynchronous connections
  61. Should be ran on a seperate thread and will simply wait on the connection status to be 'complete'. This is necessary for the MPI specification.
  62. """
  63. import pypdevs.accurate_time as time
  64. while 1:
  65. try:
  66. # This is atomic (at least where it matters)
  67. MPI.Request.Wait(MPIRedirect.lst.pop())
  68. except IndexError:
  69. # List is empty
  70. time.sleep(1)
  71. except:
  72. # Can happen during shutdown, though it won't be recognized as 'AttributeError'
  73. pass
  74. class MPIRedirect(object):
  75. """
  76. Redirect all calls to an instantiation of this class to the server for which it was created, uses MPI (or the dummy implementation).
  77. For speed, it contains an optimisation when the call is actually done locally (it will simply start a thread then). This complete
  78. implemenation is based on so called 'magic functions' from Python.
  79. """
  80. # Reserve 50 slots, this is (hopefully) way too much, though the backend would crash if we run out of these...
  81. # Honestly, if you have 50 connections for which you are waiting, you will have worse problems than running out of IDs
  82. waiting = [None] * 50
  83. # Don't use range itself, as this doesn't work in Python3
  84. free_ids = [i for i in range(50)]
  85. noproxy = frozenset(["__getnewargs__",
  86. "__getinitargs__",
  87. "__str__",
  88. "__repr__"])
  89. local = None
  90. lst = []
  91. if COMM_WORLD.Get_size() > 1:
  92. thrd = threading.Thread(target=cleaning, args=[])
  93. thrd.daemon = True
  94. thrd.start()
  95. def __init__(self, rank):
  96. """
  97. Constructor.
  98. :param rank: the rank of the server to redirect the call to
  99. :param oneways: iterable containing all functions that should be done without waiting for completion
  100. """
  101. self.rank = int(rank)
  102. self.oneway = oneways
  103. def __getinitargs__(self):
  104. """
  105. For pickling
  106. :returns: list containing the rank
  107. """
  108. return [self.rank]
  109. def __getstate__(self):
  110. """
  111. For pickling
  112. :returns: dictionary containing the rank and the oneway list
  113. """
  114. return {"rank": self.rank, "oneway": self.oneway}
  115. def __setstate__(self, state):
  116. """
  117. For pickling
  118. :param state: the dictionary provided by the *__getstate__* method
  119. """
  120. self.rank = state["rank"]
  121. self.oneway = state["oneway"]
  122. def __getattr__(self, name):
  123. """
  124. Determine whether or not we should redirect the call to the local or the remote server
  125. :param name: the name of the function to call
  126. :returns: function -- function to be actually called to perform the action
  127. """
  128. if name in MPIRedirect.noproxy:
  129. raise AttributeError(name)
  130. def newcall(*args, **kwargs):
  131. """
  132. A call to a remote location
  133. """
  134. return MPIRedirect.remoteCall(self, name, *args, **kwargs)
  135. return newcall
  136. def remoteCall(self, method, *args, **kwargs):
  137. """
  138. Make the remote call
  139. :param method: method name to call (as a string)
  140. :returns: return value of the called method; always None in case it is a one-way call
  141. """
  142. # Unique tag, but at least 2 (0 reserved for exit, 1 is reserved for calls)
  143. wait = str(method) not in self.oneway
  144. if wait:
  145. call_id = MPIRedirect.free_ids.pop()
  146. else:
  147. # Mention that we are not waiting for a reply
  148. call_id = None
  149. data = [call_id, method, args, kwargs]
  150. if wait:
  151. MPIRedirect.waiting[call_id] = event = threading.Event()
  152. MPIRedirect.lst.append(COMM_WORLD.isend(data, dest=self.rank, tag=1))
  153. if wait:
  154. event.wait()
  155. response = MPIRedirect.waiting[call_id]
  156. # Clear the object from memory
  157. MPIRedirect.waiting[call_id] = None
  158. MPIRedirect.free_ids.append(call_id)
  159. return response
  160. class LocalRedirect(object):
  161. """
  162. Local redirector class
  163. """
  164. def localCall(self, method, *args, **kwargs):
  165. """
  166. Actually perform the local call
  167. :param method: the name of the method
  168. :returns: the return value of the function, None if it is a oneway call
  169. """
  170. func = getattr(self.server, method)
  171. if str(method) in self.oneway:
  172. threading.Thread(target=func, args=args, kwargs=kwargs).start()
  173. else:
  174. return func(*args, **kwargs)
  175. def __init__(self, server):
  176. """
  177. Constructor.
  178. :param server: the local server
  179. """
  180. self.server = server
  181. self.oneway = oneways
  182. def __getattr__(self, name):
  183. """
  184. Determine whether or not we should redirect the call to the local or the remote server
  185. :param name: the name of the function to call
  186. :returns: function -- function to be actually called to perform the action
  187. """
  188. if name in MPIRedirect.noproxy:
  189. raise AttributeError(name)
  190. def localcall(*args, **kwargs):
  191. """
  192. A call to a local location
  193. """
  194. return LocalRedirect.localCall(self, name, *args, **kwargs)
  195. return localcall
  196. def __getinitargs__(self):
  197. """
  198. For pickling
  199. :returns: list containing the rank
  200. """
  201. return [self.server]
  202. def __getstate__(self):
  203. """
  204. For pickling
  205. :returns: dictionary containing the rank and the oneway list
  206. """
  207. return {"oneway": self.oneway}
  208. def __setstate__(self, state):
  209. """
  210. For pickling
  211. :param state: the dictionary provided by the *__getstate__* method
  212. """
  213. self.oneway = state["oneway"]
  214. # No need to save the server, as it is impossible to restore it anyway