||
- """
- Generated by Statechart compiler by Glenn De Jonghe, Joeri Exelmans, Simon Van Mierlo, and Yentl Van Tendeloo (for the inspiration)
- Date: Tue Oct 11 11:56:10 2016
- Model author: Sadaf Mustafiz and Claudio Gomes and Simon Van Mierlo
- Model name: CBDSimulator
- Model description:
- SCCD HUTN model of a CBD simulator
- """
- from sccd.runtime.statecharts_core import *
- from sccd.runtime.libs.ui import *
- from sccd.runtime.libs.utils import *
- from CBD_Controller import CBDController
- import Options
- # package "CBDSimulator"
- class CBDSimulator(RuntimeClassBase):
- def __init__(self, controller, options, model):
- RuntimeClassBase.__init__(self, controller)
-
- self.semantics.big_step_maximality = StatechartSemantics.TakeMany
- self.semantics.internal_event_lifeline = StatechartSemantics.Queue
- self.semantics.input_event_lifeline = StatechartSemantics.FirstComboStep
- self.semantics.priority = StatechartSemantics.SourceParent
- self.semantics.concurrency = StatechartSemantics.Single
-
- # build Statechart structure
- self.build_statechart_structure()
-
- # user defined attributes
- self.iteration = None
- self.cbdController = None
- self.delta = None
- self.clock = None
- self.model = None
- self.depGraph = None
- self.strongComponentList = None
- self.currentCompIdx = None
-
- # call user defined constructor
- CBDSimulator.user_defined_constructor(self, options, model)
-
- def user_defined_constructor(self, options, model):
- self.options = options
- self.delta = self.options.getDeltaT()
- self.model = model
-
- def user_defined_destructor(self):
- pass
-
-
- # user defined method
- def endCondition(self):
- return self.iteration >= self.options.getMaxIterations()
-
-
- # user defined method
- def advanceTime(self):
- self.iteration = self.iteration + 1
- self.clock = self.clock + self.delta
- self.cbdController.advanceTimeStep()
-
-
- # user defined method
- def currentComponentIsCycle(self):
- return self.cbdController.componentIsCycle(self.strongComponentList[self.currentCompIdx], self.depGraph)
-
-
- # user defined method
- def hasNextStrongComponent(self):
- return (self.currentCompIdx + 1) < len(self.strongComponentList)
-
-
- # builds Statechart structure
- def build_statechart_structure(self):
-
- # state <root>
- self.states[""] = State(0, self)
-
- # state /Main
- self.states["/Main"] = ParallelState(1, self)
- self.states["/Main"].setEnter(self._Main_enter)
- self.states["/Main"].setExit(self._Main_exit)
-
- # state /Main/SimulationState
- self.states["/Main/SimulationState"] = State(2, self)
-
- # state /Main/SimulationState/Waiting
- self.states["/Main/SimulationState/Waiting"] = State(3, self)
- self.states["/Main/SimulationState/Waiting"].setEnter(self._Main_SimulationState_Waiting_enter)
- self.states["/Main/SimulationState/Waiting"].setExit(self._Main_SimulationState_Waiting_exit)
-
- # state /Main/SimulationState/Running
- self.states["/Main/SimulationState/Running"] = State(4, self)
- self.states["/Main/SimulationState/Running"].setEnter(self._Main_SimulationState_Running_enter)
- self.states["/Main/SimulationState/Running"].setExit(self._Main_SimulationState_Running_exit)
-
- # state /Main/SimulationState/Running/Continuous
- self.states["/Main/SimulationState/Running/Continuous"] = State(5, self)
-
- # state /Main/SimulationState/Stopped
- self.states["/Main/SimulationState/Stopped"] = State(6, self)
- self.states["/Main/SimulationState/Stopped"].setEnter(self._Main_SimulationState_Stopped_enter)
- self.states["/Main/SimulationState/Stopped"].setExit(self._Main_SimulationState_Stopped_exit)
-
- # state /Main/SimulationFlow
- self.states["/Main/SimulationFlow"] = State(7, self)
-
- # state /Main/SimulationFlow/Initialize
- self.states["/Main/SimulationFlow/Initialize"] = State(8, self)
- self.states["/Main/SimulationFlow/Initialize"].setEnter(self._Main_SimulationFlow_Initialize_enter)
-
- # state /Main/SimulationFlow/CheckTerminationCondition
- self.states["/Main/SimulationFlow/CheckTerminationCondition"] = State(9, self)
-
- # state /Main/SimulationFlow/CreateDependencyGraph
- self.states["/Main/SimulationFlow/CreateDependencyGraph"] = State(10, self)
- self.states["/Main/SimulationFlow/CreateDependencyGraph"].setEnter(self._Main_SimulationFlow_CreateDependencyGraph_enter)
-
- # state /Main/SimulationFlow/IsolateStrongComponents
- self.states["/Main/SimulationFlow/IsolateStrongComponents"] = State(11, self)
- self.states["/Main/SimulationFlow/IsolateStrongComponents"].setEnter(self._Main_SimulationFlow_IsolateStrongComponents_enter)
-
- # state /Main/SimulationFlow/ExecuteSimulationStep
- self.states["/Main/SimulationFlow/ExecuteSimulationStep"] = State(12, self)
-
- # state /Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent
- self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"] = State(13, self)
-
- # state /Main/SimulationFlow/ExecuteSimulationStep/CheckCycle
- self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckCycle"] = State(14, self)
-
- # state /SimulationComplete
- self.states["/SimulationComplete"] = State(15, self)
-
- # add children
- self.states[""].addChild(self.states["/Main"])
- self.states[""].addChild(self.states["/SimulationComplete"])
- self.states["/Main"].addChild(self.states["/Main/SimulationState"])
- self.states["/Main"].addChild(self.states["/Main/SimulationFlow"])
- self.states["/Main/SimulationState"].addChild(self.states["/Main/SimulationState/Waiting"])
- self.states["/Main/SimulationState"].addChild(self.states["/Main/SimulationState/Running"])
- self.states["/Main/SimulationState"].addChild(self.states["/Main/SimulationState/Stopped"])
- self.states["/Main/SimulationState/Running"].addChild(self.states["/Main/SimulationState/Running/Continuous"])
- self.states["/Main/SimulationFlow"].addChild(self.states["/Main/SimulationFlow/Initialize"])
- self.states["/Main/SimulationFlow"].addChild(self.states["/Main/SimulationFlow/CheckTerminationCondition"])
- self.states["/Main/SimulationFlow"].addChild(self.states["/Main/SimulationFlow/CreateDependencyGraph"])
- self.states["/Main/SimulationFlow"].addChild(self.states["/Main/SimulationFlow/IsolateStrongComponents"])
- self.states["/Main/SimulationFlow"].addChild(self.states["/Main/SimulationFlow/ExecuteSimulationStep"])
- self.states["/Main/SimulationFlow/ExecuteSimulationStep"].addChild(self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"])
- self.states["/Main/SimulationFlow/ExecuteSimulationStep"].addChild(self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckCycle"])
- self.states[""].fixTree()
- self.states[""].default_state = self.states["/Main"]
- self.states["/Main/SimulationState"].default_state = self.states["/Main/SimulationState/Waiting"]
- self.states["/Main/SimulationState/Running"].default_state = self.states["/Main/SimulationState/Running/Continuous"]
- self.states["/Main/SimulationFlow"].default_state = self.states["/Main/SimulationFlow/Initialize"]
- self.states["/Main/SimulationFlow/ExecuteSimulationStep"].default_state = self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"]
-
- # transition /Main/SimulationState/Waiting
- _Main_SimulationState_Waiting_0 = Transition(self, self.states["/Main/SimulationState/Waiting"], [self.states["/Main/SimulationState/Running/Continuous"]])
- _Main_SimulationState_Waiting_0.setTrigger(Event("continuous", "user_input"))
- self.states["/Main/SimulationState/Waiting"].addTransition(_Main_SimulationState_Waiting_0)
-
- # transition /Main/SimulationFlow/Initialize
- _Main_SimulationFlow_Initialize_0 = Transition(self, self.states["/Main/SimulationFlow/Initialize"], [self.states["/Main/SimulationFlow/CheckTerminationCondition"]])
- _Main_SimulationFlow_Initialize_0.setTrigger(None)
- self.states["/Main/SimulationFlow/Initialize"].addTransition(_Main_SimulationFlow_Initialize_0)
-
- # transition /Main/SimulationFlow/CheckTerminationCondition
- _Main_SimulationFlow_CheckTerminationCondition_0 = Transition(self, self.states["/Main/SimulationFlow/CheckTerminationCondition"], [self.states["/Main/SimulationFlow/CreateDependencyGraph"]])
- _Main_SimulationFlow_CheckTerminationCondition_0.setTrigger(None)
- _Main_SimulationFlow_CheckTerminationCondition_0.setGuard(self._Main_SimulationFlow_CheckTerminationCondition_0_guard)
- self.states["/Main/SimulationFlow/CheckTerminationCondition"].addTransition(_Main_SimulationFlow_CheckTerminationCondition_0)
-
- # transition /Main/SimulationFlow/CreateDependencyGraph
- _Main_SimulationFlow_CreateDependencyGraph_0 = Transition(self, self.states["/Main/SimulationFlow/CreateDependencyGraph"], [self.states["/Main/SimulationFlow/IsolateStrongComponents"]])
- _Main_SimulationFlow_CreateDependencyGraph_0.setTrigger(None)
- self.states["/Main/SimulationFlow/CreateDependencyGraph"].addTransition(_Main_SimulationFlow_CreateDependencyGraph_0)
-
- # transition /Main/SimulationFlow/IsolateStrongComponents
- _Main_SimulationFlow_IsolateStrongComponents_0 = Transition(self, self.states["/Main/SimulationFlow/IsolateStrongComponents"], [self.states["/Main/SimulationFlow/ExecuteSimulationStep"]])
- _Main_SimulationFlow_IsolateStrongComponents_0.setAction(self._Main_SimulationFlow_IsolateStrongComponents_0_exec)
- _Main_SimulationFlow_IsolateStrongComponents_0.setTrigger(None)
- self.states["/Main/SimulationFlow/IsolateStrongComponents"].addTransition(_Main_SimulationFlow_IsolateStrongComponents_0)
-
- # transition /Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent
- _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0 = Transition(self, self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"], [self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckCycle"]])
- _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0.setAction(self._Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0_exec)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0.setTrigger(None)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0.setGuard(self._Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0_guard)
- self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"].addTransition(_Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1 = Transition(self, self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"], [self.states["/Main/SimulationFlow/CheckTerminationCondition"]])
- _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1.setAction(self._Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1_exec)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1.setTrigger(None)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1.setGuard(self._Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1_guard)
- self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"].addTransition(_Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1)
-
- # transition /Main/SimulationFlow/ExecuteSimulationStep/CheckCycle
- _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0 = Transition(self, self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckCycle"], [self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"]])
- _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0.setAction(self._Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0_exec)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0.setTrigger(None)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0.setGuard(self._Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0_guard)
- self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckCycle"].addTransition(_Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1 = Transition(self, self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckCycle"], [self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckNextComponent"]])
- _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1.setAction(self._Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1_exec)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1.setTrigger(None)
- _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1.setGuard(self._Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1_guard)
- self.states["/Main/SimulationFlow/ExecuteSimulationStep/CheckCycle"].addTransition(_Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1)
-
- # transition /Main
- _Main_0 = Transition(self, self.states["/Main"], [self.states["/SimulationComplete"]])
- _Main_0.setTrigger(Event("_0after"))
- _Main_0.setGuard(self._Main_0_guard)
- self.states["/Main"].addTransition(_Main_0)
-
- # transition /Main/SimulationState/Running
- _Main_SimulationState_Running_0 = Transition(self, self.states["/Main/SimulationState/Running"], [self.states["/Main/SimulationState/Stopped"]])
- _Main_SimulationState_Running_0.setAction(self._Main_SimulationState_Running_0_exec)
- _Main_SimulationState_Running_0.setTrigger(None)
- _Main_SimulationState_Running_0.setGuard(self._Main_SimulationState_Running_0_guard)
- self.states["/Main/SimulationState/Running"].addTransition(_Main_SimulationState_Running_0)
-
- def _Main_enter(self):
- self.addTimer(0, self.sccd_yield())
-
- def _Main_exit(self):
- self.removeTimer(0)
-
- def _Main_SimulationState_Running_enter(self):
- print 'entering SimulationState/Running'
-
- def _Main_SimulationState_Running_exit(self):
- print 'exiting SimulationState/Running'
-
- def _Main_SimulationState_Waiting_enter(self):
- print 'entering SimulationState/Waiting'
-
- def _Main_SimulationState_Waiting_exit(self):
- print 'exiting SimulationState/Waiting'
-
- def _Main_SimulationState_Stopped_enter(self):
- print 'entering SimulationState/Stopped'
-
- def _Main_SimulationState_Stopped_exit(self):
- print 'exiting SimulationState/Stopped'
-
- def _Main_SimulationFlow_Initialize_enter(self):
- self.iteration = 0
- self.clock = 0
- self.cbdController = CBDController(self.model, self.delta)
- self.cbdController.initSimulation()
-
- def _Main_SimulationFlow_CreateDependencyGraph_enter(self):
- self.depGraph = self.cbdController.createDepGraph(self.iteration)
-
- def _Main_SimulationFlow_IsolateStrongComponents_enter(self):
- self.strongComponentList = self.cbdController.createStrongComponents(self.depGraph, self.iteration)
-
- def _Main_0_guard(self, parameters):
- return self.inState(["/Main/SimulationState/Stopped"])
-
- def _Main_SimulationState_Running_0_exec(self, parameters):
- self.raiseInternalEvent(Event("termination_condition", None, []))
-
- def _Main_SimulationState_Running_0_guard(self, parameters):
- return self.endCondition()
-
- def _Main_SimulationFlow_CheckTerminationCondition_0_guard(self, parameters):
- return self.inState(["/Main/SimulationState/Running"])
-
- def _Main_SimulationFlow_IsolateStrongComponents_0_exec(self, parameters):
- self.currentCompIdx = -1
-
- def _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0_exec(self, parameters):
- self.currentCompIdx = self.currentCompIdx + 1
-
- def _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_0_guard(self, parameters):
- return self.hasNextStrongComponent()
-
- def _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1_exec(self, parameters):
- self.advanceTime()
-
- def _Main_SimulationFlow_ExecuteSimulationStep_CheckNextComponent_1_guard(self, parameters):
- return not self.hasNextStrongComponent()
-
- def _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0_exec(self, parameters):
- self.cbdController.computeNextBlock(self.strongComponentList[self.currentCompIdx], self.iteration)
-
- def _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_0_guard(self, parameters):
- return not self.currentComponentIsCycle()
-
- def _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1_exec(self, parameters):
- self.cbdController.computeNextAlgebraicLoop(self.strongComponentList[self.currentCompIdx], self.iteration)
-
- def _Main_SimulationFlow_ExecuteSimulationStep_CheckCycle_1_guard(self, parameters):
- return self.currentComponentIsCycle()
-
- def initializeStatechart(self):
- # enter default state
- self.default_targets = self.states["/Main"].getEffectiveTargetStates()
- RuntimeClassBase.initializeStatechart(self)
- class ObjectManager(ObjectManagerBase):
- def __init__(self, controller):
- ObjectManagerBase.__init__(self, controller)
-
- def instantiate(self, class_name, construct_params):
- if class_name == "CBDSimulator":
- instance = CBDSimulator(self.controller, construct_params[0], construct_params[1])
- instance.associations = {}
- else:
- raise Exception("Cannot instantiate class " + class_name)
- return instance
- class Controller(ThreadsControllerBase):
- def __init__(self, options, model, keep_running = None, behind_schedule_callback = None):
- if keep_running == None: keep_running = True
- if behind_schedule_callback == None: behind_schedule_callback = None
- ThreadsControllerBase.__init__(self, ObjectManager(self), keep_running, behind_schedule_callback)
- self.addInputPort("user_input")
- self.addInputPort("user_output")
- self.object_manager.createInstance("CBDSimulator", [options, model])
|