""" Generated by Statechart compiler by Glenn De Jonghe, Joeri Exelmans, Simon Van Mierlo, and Yentl Van Tendeloo (for the inspiration) and Sam Pieters (DEVS) Model author: Simon Van Mierlo Model name: Timer (Threaded Version) """ from sccd.runtime.DEVS_statecharts_core import * import time # package "Timer (Threaded Version)" class MainAppInstance(RuntimeClassBase): def __init__(self, atomdevs, id, start_port_id): RuntimeClassBase.__init__(self, atomdevs, id) self.semantics.big_step_maximality = StatechartSemantics.TakeMany self.semantics.internal_event_lifeline = StatechartSemantics.Queue self.semantics.input_event_lifeline = StatechartSemantics.FirstComboStep self.semantics.priority = StatechartSemantics.SourceParent self.semantics.concurrency = StatechartSemantics.Single # build Statechart structure self.build_statechart_structure() # call user defined constructor MainAppInstance.user_defined_constructor(self) port_name = addInputPort("input", start_port_id, True) atomdevs.state.port_mappings[port_name] = id port_name = addInputPort("", start_port_id) atomdevs.state.port_mappings[port_name] = id def user_defined_constructor(self): self.starting_time = time.time() def user_defined_destructor(self): pass # user defined method def print_simulated_time(self): print('SIMTIME = %.2f' % (self.getSimulatedTime() / 1000.0)) # user defined method def print_wct_time(self): print('ACTTIME = %.2f' % (time.time() - self.starting_time)) # builds Statechart structure def build_statechart_structure(self): # state self.states[""] = State(0, "", self) # state /running self.states["/running"] = ParallelState(1, "/running", self) # state /running/print_simulated_time self.states["/running/print_simulated_time"] = State(2, "/running/print_simulated_time", self) # state /running/print_simulated_time/print_simulated_time self.states["/running/print_simulated_time/print_simulated_time"] = State(3, "/running/print_simulated_time/print_simulated_time", self) self.states["/running/print_simulated_time/print_simulated_time"].setEnter(self._running_print_simulated_time_print_simulated_time_enter) self.states["/running/print_simulated_time/print_simulated_time"].setExit(self._running_print_simulated_time_print_simulated_time_exit) # state /running/print_wct_time self.states["/running/print_wct_time"] = State(4, "/running/print_wct_time", self) # state /running/print_wct_time/print_wct_time self.states["/running/print_wct_time/print_wct_time"] = State(5, "/running/print_wct_time/print_wct_time", self) self.states["/running/print_wct_time/print_wct_time"].setEnter(self._running_print_wct_time_print_wct_time_enter) self.states["/running/print_wct_time/print_wct_time"].setExit(self._running_print_wct_time_print_wct_time_exit) # state /interrupted self.states["/interrupted"] = State(6, "/interrupted", self) # add children self.states[""].addChild(self.states["/running"]) self.states[""].addChild(self.states["/interrupted"]) self.states["/running"].addChild(self.states["/running/print_simulated_time"]) self.states["/running"].addChild(self.states["/running/print_wct_time"]) self.states["/running/print_simulated_time"].addChild(self.states["/running/print_simulated_time/print_simulated_time"]) self.states["/running/print_wct_time"].addChild(self.states["/running/print_wct_time/print_wct_time"]) self.states[""].fixTree() self.states[""].default_state = self.states["/running"] self.states["/running/print_simulated_time"].default_state = self.states["/running/print_simulated_time/print_simulated_time"] self.states["/running/print_wct_time"].default_state = self.states["/running/print_wct_time/print_wct_time"] # transition /running/print_simulated_time/print_simulated_time _running_print_simulated_time_print_simulated_time_0 = Transition(self, self.states["/running/print_simulated_time/print_simulated_time"], [self.states["/running/print_simulated_time/print_simulated_time"]]) _running_print_simulated_time_print_simulated_time_0.setAction(self._running_print_simulated_time_print_simulated_time_0_exec) _running_print_simulated_time_print_simulated_time_0.setTrigger(Event("_0after")) self.states["/running/print_simulated_time/print_simulated_time"].addTransition(_running_print_simulated_time_print_simulated_time_0) # transition /running/print_wct_time/print_wct_time _running_print_wct_time_print_wct_time_0 = Transition(self, self.states["/running/print_wct_time/print_wct_time"], [self.states["/running/print_wct_time/print_wct_time"]]) _running_print_wct_time_print_wct_time_0.setAction(self._running_print_wct_time_print_wct_time_0_exec) _running_print_wct_time_print_wct_time_0.setTrigger(Event("_1after")) self.states["/running/print_wct_time/print_wct_time"].addTransition(_running_print_wct_time_print_wct_time_0) # transition /interrupted _interrupted_0 = Transition(self, self.states["/interrupted"], [self.states["/interrupted"]]) _interrupted_0.setAction(self._interrupted_0_exec) _interrupted_0.setTrigger(Event("interrupt", self.getInPortName("input"))) self.states["/interrupted"].addTransition(_interrupted_0) _interrupted_1 = Transition(self, self.states["/interrupted"], [self.states["/running"]]) _interrupted_1.setAction(self._interrupted_1_exec) _interrupted_1.setTrigger(Event("continue", self.getInPortName("input"))) self.states["/interrupted"].addTransition(_interrupted_1) # transition /running _running_0 = Transition(self, self.states["/running"], [self.states["/interrupted"]]) _running_0.setAction(self._running_0_exec) _running_0.setTrigger(Event("interrupt", self.getInPortName("input"))) self.states["/running"].addTransition(_running_0) # transition /running/print_simulated_time _running_print_simulated_time_0 = Transition(self, self.states["/running/print_simulated_time"], [self.states["/running/print_simulated_time"]]) _running_print_simulated_time_0.setAction(self._running_print_simulated_time_0_exec) _running_print_simulated_time_0.setTrigger(Event("interrupt", self.getInPortName("input"))) self.states["/running/print_simulated_time"].addTransition(_running_print_simulated_time_0) def _running_print_simulated_time_print_simulated_time_enter(self): self.addTimer(0, 0.05) def _running_print_simulated_time_print_simulated_time_exit(self): self.removeTimer(0) def _running_print_wct_time_print_wct_time_enter(self): self.addTimer(1, 0.05) def _running_print_wct_time_print_wct_time_exit(self): self.removeTimer(1) def _running_0_exec(self, parameters): self.print_simulated_time() self.print_wct_time() def _running_print_simulated_time_0_exec(self, parameters): print('going nowhere') def _running_print_simulated_time_print_simulated_time_0_exec(self, parameters): self.print_simulated_time() def _running_print_wct_time_print_wct_time_0_exec(self, parameters): self.print_wct_time() def _interrupted_0_exec(self, parameters): self.print_simulated_time() self.print_wct_time() def _interrupted_1_exec(self, parameters): self.print_simulated_time() self.print_wct_time() def initializeStatechart(self): # enter default state self.default_targets = self.states["/running"].getEffectiveTargetStates() RuntimeClassBase.initializeStatechart(self) class MainApp(ClassBase): def __init__(self, name): ClassBase.__init__(self, name) self.input = self.addInPort("input") new_instance = self.constructObject(0, 0, []) self.state.instances[new_instance.instance_id] = new_instance new_instance.start() self.state.next_time = 0 def constructObject(self, id, start_port_id, parameters): new_instance = MainAppInstance(self, id, start_port_id) return new_instance def instantiate(self, class_name, construct_params): instance = {} instance["name"] = class_name if class_name == "MainApp": self.narrow_cast_id = self.narrow_cast_id + 0 instance["associations"] = {} else: raise Exception("Cannot instantiate class " + class_name) return instance ObjectManagerState.instantiate = instantiate class ObjectManager(ObjectManagerBase): def __init__(self, name): ObjectManagerBase.__init__(self, name) self.state = ObjectManagerState() self.input = self.addInPort("input") self.output["MainApp"] = self.addOutPort() self.state.createInstance("MainApp", []) class Controller(CoupledDEVS): def __init__(self, name): CoupledDEVS.__init__(self, name) self.in_input = self.addInPort("input") self.objectmanager = self.addSubModel(ObjectManager("ObjectManager")) self.atomics = [] self.atomics.append(self.addSubModel(MainApp("MainApp"))) self.connectPorts(self.atomics[0].obj_manager_out, self.objectmanager.input) self.connectPorts(self.objectmanager.output["MainApp"], self.atomics[0].obj_manager_in) self.connectPorts(self.in_input, self.atomics[0].input)