fsasimulator_debugging.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. """
  2. Generated by Statechart compiler by Glenn De Jonghe, Joeri Exelmans, Simon Van Mierlo, and Yentl Van Tendeloo (for the inspiration)
  3. Date: Wed Oct 12 13:55:43 2016
  4. Model author: Sadaf Mustafiz and Bruno Barroca and Claudio Gomes and Simon Van Mierlo
  5. Model name: FSASimulator
  6. Model description:
  7. A debuggeable FSA simulator. It supports after and events.
  8. """
  9. from sccd.runtime.statecharts_core import *
  10. from sccd.runtime.libs.ui import *
  11. from sccd.runtime.libs.utils import *
  12. import fsaclasses
  13. import sccd.runtime.accurate_time as accurate_time
  14. class Breakpoint:
  15. def __init__(self, name, function, enabled, disable_on_trigger):
  16. self.name = name
  17. self.function = function
  18. self.enabled = enabled
  19. self.disable_on_trigger = disable_on_trigger
  20. # package "FSASimulator"
  21. class FSASimulator(RuntimeClassBase):
  22. def __init__(self, controller, amodel, events):
  23. RuntimeClassBase.__init__(self, controller)
  24. self.semantics.big_step_maximality = StatechartSemantics.TakeMany
  25. self.semantics.internal_event_lifeline = StatechartSemantics.Queue
  26. self.semantics.input_event_lifeline = StatechartSemantics.FirstComboStep
  27. self.semantics.priority = StatechartSemantics.SourceParent
  28. self.semantics.concurrency = StatechartSemantics.Single
  29. # build Statechart structure
  30. self.build_statechart_structure()
  31. # user defined attributes
  32. self.elapsed = None
  33. self.super_step_completed = None
  34. self.model = None
  35. self.logicalTime = None
  36. self.timestep = None
  37. self.currentEvent = None
  38. self.selectedTransition = None
  39. self.eventList = None
  40. self.currentState = None
  41. # call user defined constructor
  42. FSASimulator.user_defined_constructor(self, amodel, events)
  43. def user_defined_constructor(self, amodel, events):
  44. self.model = amodel
  45. self.eventList = events
  46. def user_defined_destructor(self):
  47. pass
  48. # user defined method
  49. def processEvent(self, event):
  50. if (event != None):
  51. self.eventList.popEvent(event)
  52. event.processed = True
  53. # user defined method
  54. def getInputEventAt(self, time):
  55. return self.eventList.getInputAt(time)
  56. # user defined method
  57. def initialize(self):
  58. self.logicalTime = 0
  59. self.elapsed = 0
  60. self.timestep = 1.0
  61. self.currentState = self.model.initialState
  62. # user defined method
  63. def initializeDebugger(self):
  64. self.breakpoints = []
  65. self.triggered_bp = None
  66. # user defined method
  67. def endCondition(self):
  68. return self.currentState.final
  69. # user defined method
  70. def advanceTime(self):
  71. self.logicalTime = self.logicalTime + self.timestep
  72. self.elapsed = self.elapsed + self.timestep
  73. # user defined method
  74. def addBreakpoint(self, name, function, enabled = None, disable_on_trigger = None):
  75. if enabled == None: enabled = true
  76. if disable_on_trigger == None: disable_on_trigger = true
  77. if len([bp for bp in self.breakpoints if bp.name == name]) > 0:
  78. return -1
  79. self.breakpoints.append(Breakpoint(name, function, enabled, disable_on_trigger))
  80. return 0
  81. # user defined method
  82. def delBreakpoint(self, name):
  83. if len([bp for bp in self.breakpoints if bp.name == name]) == 0:
  84. return -1
  85. self.breakpoints = [bp for bp in self.breakpoints if bp.name != name]
  86. return 0
  87. # user defined method
  88. def toggleBreakpoint(self, name):
  89. if len([bp for bp in self.breakpoints if bp.name == name]) == 0:
  90. return -1
  91. for bp in self.breakpoints:
  92. if bp.name == name:
  93. bp.enabled = enabled
  94. break
  95. return 0
  96. # user defined method
  97. def breakpointTriggers(self, is_realtime_simulation):
  98. self.triggered_bp = None
  99. for bp in self.breakpoints:
  100. if not bp.enabled:
  101. continue
  102. # include the function in the scope...
  103. exec(bp.function)
  104. # ... and execute it, note that the breakpoint thus has to start with "def breakpoint("
  105. # note that we pass self.time_next instead of self.simulated_time in the case of as-fast-as-possible simulation (or stepping)
  106. # this is to make sure that the simulation is stopped BEFORE the specified time is reached, and not AFTER (because we don't necessarily implement 'step back')
  107. # in case of realtime simulation, we do pass the current simulated time, since we can stop at (more or less) exactly the right time
  108. if breakpoint({'clock': (self.clock if is_realtime_simulation else self.time_next) / 1000.0, 'state': self.state}):
  109. # triggered!
  110. self.triggered_bp = bp.name
  111. if bp.disable_on_trigger:
  112. bp.enabled = False
  113. return True
  114. else:
  115. # not triggered, so continue
  116. continue
  117. return False
  118. # user defined method
  119. def godEvent(self, state_var, new_val):
  120. pass
  121. # builds Statechart structure
  122. def build_statechart_structure(self):
  123. # state <root>
  124. self.states[""] = State(0, self)
  125. # state /CheckTermination
  126. self.states["/CheckTermination"] = State(1, self)
  127. # state /CheckTermination/MacroStepProcessed
  128. self.states["/CheckTermination/MacroStepProcessed"] = State(2, self)
  129. # state /DoSimulation
  130. self.states["/DoSimulation"] = State(3, self)
  131. # state /DoSimulation/MicroStepPrepared
  132. self.states["/DoSimulation/MicroStepPrepared"] = State(4, self)
  133. # state /DoSimulation/MicroStepProcessed
  134. self.states["/DoSimulation/MicroStepProcessed"] = State(5, self)
  135. # state /DoSimulation/MacroStepPrepared
  136. self.states["/DoSimulation/MacroStepPrepared"] = State(6, self)
  137. # state /Initialized
  138. self.states["/Initialized"] = State(7, self)
  139. # state /Started
  140. self.states["/Started"] = State(8, self)
  141. # state /SimulationComplete
  142. self.states["/SimulationComplete"] = State(9, self)
  143. # add children
  144. self.states[""].addChild(self.states["/CheckTermination"])
  145. self.states[""].addChild(self.states["/DoSimulation"])
  146. self.states[""].addChild(self.states["/Initialized"])
  147. self.states[""].addChild(self.states["/Started"])
  148. self.states[""].addChild(self.states["/SimulationComplete"])
  149. self.states["/CheckTermination"].addChild(self.states["/CheckTermination/MacroStepProcessed"])
  150. self.states["/DoSimulation"].addChild(self.states["/DoSimulation/MicroStepPrepared"])
  151. self.states["/DoSimulation"].addChild(self.states["/DoSimulation/MicroStepProcessed"])
  152. self.states["/DoSimulation"].addChild(self.states["/DoSimulation/MacroStepPrepared"])
  153. self.states[""].fixTree()
  154. self.states[""].default_state = self.states["/Started"]
  155. self.states["/CheckTermination"].default_state = self.states["/CheckTermination/MacroStepProcessed"]
  156. self.states["/DoSimulation"].default_state = self.states["/DoSimulation/MacroStepPrepared"]
  157. # transition /CheckTermination/MacroStepProcessed
  158. _CheckTermination_MacroStepProcessed_0 = Transition(self, self.states["/CheckTermination/MacroStepProcessed"], [self.states["/SimulationComplete"]])
  159. _CheckTermination_MacroStepProcessed_0.setAction(self._CheckTermination_MacroStepProcessed_0_exec)
  160. _CheckTermination_MacroStepProcessed_0.setTrigger(None)
  161. _CheckTermination_MacroStepProcessed_0.setGuard(self._CheckTermination_MacroStepProcessed_0_guard)
  162. self.states["/CheckTermination/MacroStepProcessed"].addTransition(_CheckTermination_MacroStepProcessed_0)
  163. _CheckTermination_MacroStepProcessed_1 = Transition(self, self.states["/CheckTermination/MacroStepProcessed"], [self.states["/DoSimulation"]])
  164. _CheckTermination_MacroStepProcessed_1.setAction(self._CheckTermination_MacroStepProcessed_1_exec)
  165. _CheckTermination_MacroStepProcessed_1.setTrigger(None)
  166. _CheckTermination_MacroStepProcessed_1.setGuard(self._CheckTermination_MacroStepProcessed_1_guard)
  167. self.states["/CheckTermination/MacroStepProcessed"].addTransition(_CheckTermination_MacroStepProcessed_1)
  168. # transition /DoSimulation/MicroStepPrepared
  169. _DoSimulation_MicroStepPrepared_0 = Transition(self, self.states["/DoSimulation/MicroStepPrepared"], [self.states["/DoSimulation/MicroStepProcessed"]])
  170. _DoSimulation_MicroStepPrepared_0.setAction(self._DoSimulation_MicroStepPrepared_0_exec)
  171. _DoSimulation_MicroStepPrepared_0.setTrigger(None)
  172. self.states["/DoSimulation/MicroStepPrepared"].addTransition(_DoSimulation_MicroStepPrepared_0)
  173. # transition /DoSimulation/MicroStepProcessed
  174. _DoSimulation_MicroStepProcessed_0 = Transition(self, self.states["/DoSimulation/MicroStepProcessed"], [self.states["/DoSimulation/MicroStepPrepared"]])
  175. _DoSimulation_MicroStepProcessed_0.setAction(self._DoSimulation_MicroStepProcessed_0_exec)
  176. _DoSimulation_MicroStepProcessed_0.setTrigger(None)
  177. _DoSimulation_MicroStepProcessed_0.setGuard(self._DoSimulation_MicroStepProcessed_0_guard)
  178. self.states["/DoSimulation/MicroStepProcessed"].addTransition(_DoSimulation_MicroStepProcessed_0)
  179. _DoSimulation_MicroStepProcessed_1 = Transition(self, self.states["/DoSimulation/MicroStepProcessed"], [self.states["/CheckTermination"]])
  180. _DoSimulation_MicroStepProcessed_1.setAction(self._DoSimulation_MicroStepProcessed_1_exec)
  181. _DoSimulation_MicroStepProcessed_1.setTrigger(None)
  182. _DoSimulation_MicroStepProcessed_1.setGuard(self._DoSimulation_MicroStepProcessed_1_guard)
  183. self.states["/DoSimulation/MicroStepProcessed"].addTransition(_DoSimulation_MicroStepProcessed_1)
  184. # transition /DoSimulation/MacroStepPrepared
  185. _DoSimulation_MacroStepPrepared_0 = Transition(self, self.states["/DoSimulation/MacroStepPrepared"], [self.states["/DoSimulation/MicroStepProcessed"]])
  186. _DoSimulation_MacroStepPrepared_0.setAction(self._DoSimulation_MacroStepPrepared_0_exec)
  187. _DoSimulation_MacroStepPrepared_0.setTrigger(None)
  188. self.states["/DoSimulation/MacroStepPrepared"].addTransition(_DoSimulation_MacroStepPrepared_0)
  189. # transition /Initialized
  190. _Initialized_0 = Transition(self, self.states["/Initialized"], [self.states["/CheckTermination"]])
  191. _Initialized_0.setAction(self._Initialized_0_exec)
  192. _Initialized_0.setTrigger(None)
  193. self.states["/Initialized"].addTransition(_Initialized_0)
  194. # transition /Started
  195. _Started_0 = Transition(self, self.states["/Started"], [self.states["/Initialized"]])
  196. _Started_0.setAction(self._Started_0_exec)
  197. _Started_0.setTrigger(None)
  198. self.states["/Started"].addTransition(_Started_0)
  199. def _CheckTermination_MacroStepProcessed_0_exec(self, parameters):
  200. print('Going to End... ')
  201. def _CheckTermination_MacroStepProcessed_0_guard(self, parameters):
  202. return self.endCondition()
  203. def _CheckTermination_MacroStepProcessed_1_exec(self, parameters):
  204. print('Going to DoSimulation and reading events... ')
  205. self.currentEvent = self.getInputEventAt(self.logicalTime)
  206. self.selectedTransition = self.model.getTransitionFrom(self.currentState, self.currentEvent, self.elapsed)
  207. print(self.currentEvent)
  208. print(self.selectedTransition)
  209. def _CheckTermination_MacroStepProcessed_1_guard(self, parameters):
  210. return not self.endCondition()
  211. def _DoSimulation_MicroStepPrepared_0_exec(self, parameters):
  212. print('Going to MicroStepProcessed, taking transition and reading events... ')
  213. print('Transition to be taken: ' + str(self.selectedTransition))
  214. self.currentState = self.selectedTransition.target
  215. self.elapsed = 0
  216. self.processEvent(self.currentEvent)
  217. print('New state: ' + str(self.currentState))
  218. self.currentEvent = self.getInputEventAt(self.logicalTime)
  219. self.selectedTransition = self.model.getTransitionFrom(self.currentState, self.currentEvent, self.elapsed)
  220. print(self.currentEvent)
  221. print(self.selectedTransition)
  222. def _DoSimulation_MicroStepProcessed_0_exec(self, parameters):
  223. print('Going to MicroStepPrepared... ')
  224. def _DoSimulation_MicroStepProcessed_0_guard(self, parameters):
  225. return self.selectedTransition != None
  226. def _DoSimulation_MicroStepProcessed_1_exec(self, parameters):
  227. print('Going to CheckTermination and advancing time... ')
  228. self.advanceTime()
  229. print(self.logicalTime)
  230. print(self.elapsed)
  231. def _DoSimulation_MicroStepProcessed_1_guard(self, parameters):
  232. return self.selectedTransition == None
  233. def _DoSimulation_MacroStepPrepared_0_exec(self, parameters):
  234. print('Going to MicroStepProcessed... ')
  235. def _Initialized_0_exec(self, parameters):
  236. print('Going to MacroStepProcessed... ')
  237. def _Started_0_exec(self, parameters):
  238. print('Going to Initialized... ')
  239. self.initialize()
  240. def initializeStatechart(self):
  241. # enter default state
  242. self.default_targets = self.states["/Started"].getEffectiveTargetStates()
  243. RuntimeClassBase.initializeStatechart(self)
  244. class ObjectManager(ObjectManagerBase):
  245. def __init__(self, controller):
  246. ObjectManagerBase.__init__(self, controller)
  247. def instantiate(self, class_name, construct_params):
  248. if class_name == "FSASimulator":
  249. instance = FSASimulator(self.controller, construct_params[0], construct_params[1])
  250. instance.associations = {}
  251. else:
  252. raise Exception("Cannot instantiate class " + class_name)
  253. return instance
  254. class Controller(ThreadsControllerBase):
  255. def __init__(self, amodel, events, keep_running = None, behind_schedule_callback = None):
  256. if keep_running == None: keep_running = True
  257. if behind_schedule_callback == None: behind_schedule_callback = None
  258. ThreadsControllerBase.__init__(self, ObjectManager(self), keep_running, behind_schedule_callback)
  259. self.addInputPort("user_input")
  260. self.addInputPort("user_output")
  261. self.object_manager.createInstance("FSASimulator", [amodel, events])