messageScheduler.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # Copyright 2014 Modelling, Simulation and Design Lab (MSDL) at
  2. # McGill University and the University of Antwerp (http://msdl.cs.mcgill.ca/)
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """
  16. Scheduler for external input messages
  17. """
  18. from heapq import heappop, heappush, heapify
  19. from pypdevs.logger import *
  20. class MessageScheduler(object):
  21. """
  22. An efficient implementation of a message scheduler for the inputQueue,
  23. it supports very fast invalidations (O(1)) and fast retrievals of first
  24. element (O(log(n) in average case)
  25. """
  26. def __init__(self):
  27. """
  28. Constructor.
  29. """
  30. # List of processed messages
  31. self.processed = []
  32. # Heap of the to be processed messages
  33. self.heap = []
  34. # All invalidated messages, simply adding a message's UUID will invalidate
  35. # the message. The counter that it keeps is for multiple invalidations
  36. self.invalids = set()
  37. def __getstate__(self):
  38. """
  39. For pickling
  40. """
  41. retdict = {}
  42. unpicklable = frozenset(["instancemethod", "lock", "_Event"])
  43. for i in dir(self):
  44. if getattr(self, i).__class__.__name__ in unpicklable:
  45. # unpicklable, so don't copy it
  46. continue
  47. elif str(i).startswith("__"):
  48. continue
  49. else:
  50. retdict[str(i)] = getattr(self, i)
  51. return retdict
  52. def insert(self, extraction, model_list):
  53. """
  54. Insert several messages that were created elsewhere and merge them in.
  55. :param extraction: the output of the extract method on the other message scheduler
  56. :param model_list: models that are inserted and for which extraction happened
  57. """
  58. msgs, invalids = extraction
  59. # A simple update suffices, as these messages have a unique ID
  60. self.invalids |= invalids
  61. for msg in msgs:
  62. moddata = {}
  63. for entry in msg.content:
  64. inport = model_list[entry[0]].ports[entry[1]]
  65. moddata[inport] = msg.content[entry]
  66. # Overwrite the original message
  67. msg.content = moddata
  68. self.schedule(msg)
  69. def extract(self, model_ids):
  70. """
  71. Extract messages from the message scheduler for when a model gets removed from this kernel.
  72. :param model_ids: iterable of model_ids of models that will be removed from this node
  73. :returns: tuple -- extraction that needs to be passed to the insert method of another scheduler
  74. """
  75. new_heap = []
  76. extracted = []
  77. for msg in self.heap:
  78. for port in msg.content:
  79. if port.host_DEVS.model_id in model_ids:
  80. msg.content = {(i.host_DEVS.model_id, i.port_id):
  81. msg.content[i]
  82. for i in msg.content}
  83. extracted.append(msg)
  84. else:
  85. new_heap.append(msg)
  86. # Break, as this was simply done for a python 2 and python 3 compliant version
  87. break
  88. heapify(new_heap)
  89. self.heap = new_heap
  90. return (extracted, self.invalids)
  91. def schedule(self, msg):
  92. """
  93. Schedule a message for processing
  94. :param msg: the message to schedule
  95. """
  96. try:
  97. self.invalids.remove(msg.uuid)
  98. except KeyError:
  99. heappush(self.heap, msg)
  100. def massUnschedule(self, uuids):
  101. """
  102. Unschedule several messages, this way it will no longer be processed.
  103. :param uuids: iterable of UUIDs that need to be removed
  104. """
  105. self.invalids = self.invalids.union(uuids)
  106. def readFirst(self):
  107. """
  108. Returns the first (valid) message. Not necessarily O(1), as it could be
  109. the case that a lot of invalid messages are still to be deleted.
  110. """
  111. self.cleanFirst()
  112. return self.heap[0]
  113. def removeFirst(self):
  114. """
  115. Notify that the first (valid) message is processed.
  116. :returns: msg -- the next first message that is valid
  117. """
  118. self.cleanFirst()
  119. self.processed.append(heappop(self.heap))
  120. def purgeFirst(self):
  121. """
  122. Notify that the first (valid) message must be removed
  123. :returns: msg -- the next first message that is valid
  124. """
  125. self.cleanFirst()
  126. heappop(self.heap)
  127. def cleanFirst(self):
  128. """
  129. Clean all invalid messages at the front of the list. Method MUST be called
  130. before any accesses should happen to the first element, otherwise this
  131. first element might be a message that was just invalidated
  132. """
  133. try:
  134. while 1:
  135. self.invalids.remove(self.heap[0].uuid)
  136. # If it got removed, it means that the message was indeed invalidated, so we can simply pop it
  137. heappop(self.heap)
  138. except (KeyError, IndexError):
  139. # Seems that the UUID was not invalidated, so we are done
  140. # OR
  141. # Reached the end of the heap and all were invalid
  142. pass
  143. def revert(self, time):
  144. """
  145. Revert the inputqueue to the specified time, will also clean up the list of processed elements
  146. :param time: time to which revertion should happen
  147. """
  148. try:
  149. i = 0
  150. while self.processed[i].timestamp < time:
  151. i += 1
  152. for msg in self.processed[i:]:
  153. # All processed messages were valid, so no need for the more expensive check
  154. # Should an invalidation for a processed message have just arrived, it will
  155. # be processed AFTER this revertion, thus using the normal unschedule() function
  156. heappush(self.heap, msg)
  157. self.processed = self.processed[:i]
  158. except IndexError:
  159. # All elements are smaller
  160. pass
  161. def cleanup(self, time):
  162. """
  163. Clean up the processed list, also removes all invalid elements
  164. :param time: time up to which cleanups are allowed to happen
  165. """
  166. # We can be absolutely certain that ONLY elements from the processed list should be deleted
  167. self.processed = [i for i in self.processed if i.timestamp >= time]
  168. # Clean up the dictionary too, as otherwise it will start to contain a massive amount of entries, consuming both memory and increasing the amortized worst case