server.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """
  16. Server for DEVS simulation
  17. """
  18. from pypdevs.basesimulator import BaseSimulator
  19. from pypdevs.controller import Controller
  20. import pypdevs.middleware as middleware
  21. from pypdevs.threadpool import ThreadPool
  22. import threading
  23. import sys
  24. from pypdevs.util import *
  25. from pypdevs.logger import *
  26. class Server(object):
  27. """
  28. A server to host MPI, will delegate all of its calls to the active simulation kernel.
  29. """
  30. # Don't forward some of the internally provided functions, but simply raise an AttributeError
  31. noforward = frozenset(["__str__",
  32. "__getstate__",
  33. "__setstate__",
  34. "__repr__"])
  35. def __init__(self, name, total_size):
  36. """
  37. Constructor
  38. :param name: the name of the server, used for addressing (in MPI terms, this is the rank)
  39. :param total_size: the total size of the network in which the model lives
  40. """
  41. self.name = name
  42. self.kernel = None
  43. self.size = total_size
  44. self.proxies = [MPIRedirect(i) for i in range(total_size)]
  45. from pypdevs.MPIRedirect import LocalRedirect
  46. self.proxies[name] = LocalRedirect(self)
  47. self.queued_messages = []
  48. self.queued_time = None
  49. if total_size > 1:
  50. self.threadpool = ThreadPool(2)
  51. self.bootMPI()
  52. def getProxy(self, rank):
  53. """
  54. Get a proxy to a specified rank.
  55. This rank is allowed to be the local server, in which case a local shortcut is created.
  56. :param rank: the rank to return a proxy to, should be an int
  57. :returns: proxy to the server, either of type MPIRedirect or LocalRedirect
  58. """
  59. return self.proxies[rank]
  60. def checkLoadCheckpoint(self, name, gvt):
  61. """
  62. Reconstruct the server from a checkpoint.
  63. :param name: name of the checkpoint
  64. :param gvt: the GVT to restore to
  65. :returns: bool -- whether or not the checkpoint was successfully loaded
  66. """
  67. rank = self.name
  68. #assert debug("Accessing file " + str("%s_%s_%s.pdc" % (name, gvt, rank)))
  69. try:
  70. infile = open("%s_%s_%s.pdc" % (name, gvt, rank), 'r')
  71. pickle.load(infile)
  72. return True
  73. except KeyboardInterrupt:
  74. # If the user interrupts, still reraise
  75. raise
  76. except Exception as e:
  77. # Something went wrong
  78. print("Error found: " + str(e))
  79. return False
  80. def loadCheckpoint(self, name, gvt):
  81. """
  82. Reconstruct the server from a checkpoint.
  83. :param name: name of the checkpoint
  84. :param gvt: the GVT to restore to
  85. """
  86. rank = self.name
  87. #assert debug("Accessing file " + str("%s_%s_%s.pdc" % (name, gvt, rank)))
  88. infile = open("%s_%s_%s.pdc" % (name, gvt, rank), 'r')
  89. self.kernel = pickle.load(infile)
  90. self.kernel.server = self
  91. from pypdevs.MPIRedirect import LocalRedirect
  92. self.proxies[self.name] = LocalRedirect(self)
  93. infile.close()
  94. #assert debug("Closing file")
  95. self.kernel.loadCheckpoint()
  96. def setPickledData(self, pickled_data):
  97. """
  98. Set the pickled representation of the model.
  99. For use on the controller itself, as this doesn't need to unpickle the model.
  100. :param pickled_data: the pickled model
  101. """
  102. self.kernel.pickled_model = pickled_data
  103. def prepare(self, scheduler):
  104. """
  105. Prepare the server to receive the complete model over MPI
  106. :param scheduler: the scheduler to use
  107. """
  108. data = middleware.COMM_WORLD.bcast(None, root=0)
  109. if data is not None:
  110. self.saveAndProcessModel(data, scheduler)
  111. middleware.COMM_WORLD.barrier()
  112. def saveAndProcessModel(self, pickled_model, scheduler):
  113. """
  114. Receive the model and set it on the server, but also saves it for further reinitialisation.
  115. :param pickled_model: pickled representation of the model
  116. :param scheduler: the scheduler to use
  117. """
  118. self.sendModel(pickle.loads(pickled_model), scheduler)
  119. self.kernel.pickled_model = pickled_model
  120. def getName(self):
  121. """
  122. Returns the name of the server
  123. Is practically useless, since the server is previously addressed using its name. This does have a use as a ping function though.
  124. """
  125. # Actually more of a ping function...
  126. return self.name
  127. # All calls to this server are likely to be forwarded to the currently
  128. # active simulation kernel, so provide an easy forwarder
  129. def __getattr__(self, name):
  130. """
  131. Remote calls happen on the server object, though it is different from the simulation kernel itself. Therefore, forward the actual function call to the correct kernel.
  132. :param name: the name of the method to call
  133. :returns: requested attribute
  134. """
  135. # For accesses that are actually meant for the currently running kernel
  136. if name in Server.noforward:
  137. raise AttributeError()
  138. return getattr(self.kernel, name)
  139. def processMPI(self, data, comm, remote):
  140. """
  141. Process an incomming MPI message and reply to it if necessary
  142. :param data: the data that was received
  143. :param comm: the MPI COMM object
  144. :param remote: the location from where the message was received
  145. """
  146. # Receiving a new request
  147. resend_tag = data[0]
  148. function = data[1]
  149. args = data[2]
  150. kwargs = data[3]
  151. result = getattr(self, function)(*args, **kwargs)
  152. if resend_tag is not None:
  153. if result is None:
  154. result = 0
  155. comm.send(result, dest=remote, tag=resend_tag)
  156. def listenMPI(self):
  157. """
  158. Listen for incomming MPI messages and process them as soon as they are received
  159. """
  160. comm = middleware.COMM_WORLD
  161. status = middleware.MPI.Status()
  162. while 1:
  163. #assert debug("[" + str(comm.Get_rank()) + "]Listening to remote " + str(middleware.MPI.ANY_SOURCE) + " -- " + str(middleware.MPI.ANY_TAG))
  164. # First check if a message is present, otherwise we would have to do busy polling
  165. data = comm.recv(source=middleware.MPI.ANY_SOURCE,
  166. tag=middleware.MPI.ANY_TAG, status=status)
  167. tag = status.Get_tag()
  168. #assert debug("Got data from " + str(status.Get_source()) + " (" + str(status.Get_tag()) + "): " + str(data))
  169. if tag == 0:
  170. # Flush all waiters, as we will never receive an answer when we close the receiver...
  171. self.finishWaitingPool()
  172. break
  173. elif tag == 1:
  174. # NOTE Go back to listening ASAP, so do the processing on another thread
  175. if data[1] == "receive" or data[1] == "receiveAntiMessages":
  176. self.threadpool.addTask(Server.processMPI,
  177. self,
  178. list(data),
  179. comm,
  180. status.Get_source())
  181. else:
  182. # Normal 'control' commands are immediately executed, as they would otherwise have the potential to deadlock the node
  183. threading.Thread(target=Server.processMPI,
  184. args=[self,
  185. list(data),
  186. comm,
  187. status.Get_source()]
  188. ).start()
  189. else:
  190. # Receiving an answer to a previous request
  191. try:
  192. event = MPIRedirect.waiting[tag]
  193. MPIRedirect.waiting[tag] = data
  194. event.set()
  195. except KeyError:
  196. # Probably processed elsewhere already, just skip
  197. pass
  198. except AttributeError:
  199. # Key was already set elsewhere
  200. pass
  201. def finishWaitingPool(self):
  202. """
  203. Stop the complete MPI request queue from blocking, used when stopping simulation is necessary while requests are still outstanding.
  204. """
  205. for i in MPIRedirect.waiting:
  206. try:
  207. i.set()
  208. except AttributeError:
  209. # It was not a lock...
  210. pass
  211. except KeyError:
  212. # It was deleted in the meantime
  213. pass
  214. def bootMPI(self):
  215. """
  216. Boot the MPI receivers when necessary, on an other thread to prevent blocking
  217. """
  218. if self.size > 1:
  219. listener = threading.Thread(target=Server.listenMPI, args=[self])
  220. # Make sure that this is a daemon on the controller, as otherwise this thread will prevent the atexit from stopping
  221. # Though on every other node this should NOT be a daemon, as this is the only part still running
  222. if middleware.COMM_WORLD.Get_rank() == 0:
  223. listener.daemon = True
  224. listener.start()
  225. def sendModel(self, data, scheduler):
  226. """
  227. Receive a complete model and set it.
  228. :param data: a tuple containing the model, the model_ids dictionary, scheduler name, and a flag for whether or not the model was flattened to allow pickling
  229. :param scheduler: the scheduler to use
  230. """
  231. model, model_ids, flattened = data
  232. if self.name == 0:
  233. self.kernel = Controller(self.name, model, self)
  234. else:
  235. self.kernel = BaseSimulator(self.name, model, self)
  236. self.kernel.sendModel(model, model_ids, scheduler, flattened)
  237. def finish(self):
  238. """
  239. Stop the currently running simulation
  240. """
  241. sim = self.kernel
  242. with sim.simlock:
  243. # Shut down all threads on the topmost simulator
  244. sim.finished = True
  245. sim.should_run.set()
  246. self.finishWaitingPool()
  247. # Wait until they are done
  248. sim.sim_finish.wait()
  249. def queueMessage(self, time, model_id, action):
  250. """
  251. Queue a delayed action from being sent, to make it possible to batch them.
  252. Will raise an exception if previous messages form a different time were not yet flushed!
  253. This flushing is not done automatically, as otherwise the data would be received at a further timestep
  254. which causes problems with the GVT algorithm.
  255. :param time: the time at which the action happens
  256. :param model_id: the model_id that executed the action
  257. :param action: the action to execute (as a string)
  258. """
  259. if self.queued_time is None:
  260. self.queued_time = time
  261. elif time != self.queued_time:
  262. raise DEVSException("Queued message at wrong time! Probably forgot a flush")
  263. self.queued_messages.append([model_id, action])
  264. def flushQueuedMessages(self):
  265. """
  266. Flush all queued messages to the controller. This will block until all of them are queued.
  267. It is required to flush all messages right after all of them happened and this should happen within the critical section!
  268. """
  269. if self.queued_time is not None:
  270. self.getProxy(0).massDelayedActions(self.queued_time,
  271. self.queued_messages)
  272. self.queued_messages = []
  273. self.queued_time = None