""" Generated by Statechart compiler by Glenn De Jonghe, Joeri Exelmans, Simon Van Mierlo, and Yentl Van Tendeloo (for the inspiration) and Sam Pieters (DEVS) Model author: Simon Van Mierlo Model name: Timer (Eventloop Version) """ from sccd.runtime.DEVS_statecharts_core import * from pypdevs.DEVS import * from pypdevs.infinity import * from pypdevs.simulator import * from sccd.runtime.libs.ui import ui from time import time # package "Timer (Eventloop Version)" class MainAppInstance(RuntimeClassBase): def __init__(self, atomdevs): RuntimeClassBase.__init__(self, atomdevs) self.associations = {} self.semantics.big_step_maximality = StatechartSemantics.TakeMany self.semantics.internal_event_lifeline = StatechartSemantics.Queue self.semantics.input_event_lifeline = StatechartSemantics.FirstComboStep self.semantics.priority = StatechartSemantics.SourceParent self.semantics.concurrency = StatechartSemantics.Single # build Statechart structure self.build_statechart_structure() # call user defined constructor MainAppInstance.user_defined_constructor(self) def user_defined_constructor(self): self.canvas = ui.append_canvas(ui.window,100,100,{'background':'#222222'}) self.clock_text = self.canvas.element.create_text(25,25,{'text':'0.0'}) self.actual_clock_text = self.canvas.element.create_text(25,50,{'text':'0.0'}) interrupt_button = ui.append_button(ui.window, 'INTERRUPT'); continue_button = ui.append_button(ui.window, 'CONTINUE'); ui.bind_event(interrupt_button.element, ui.EVENTS.MOUSE_CLICK, self.controller, 'interrupt_clicked'); ui.bind_event(continue_button.element, ui.EVENTS.MOUSE_CLICK, self.controller, 'continue_clicked'); def user_defined_destructor(self): pass # user defined method def update_timers(self): self.canvas.element.itemconfigure(self.clock_text, text=str('%.2f' % (self.getSimulatedTime() / 1000.0))) self.canvas.element.itemconfigure(self.actual_clock_text, text='%.2f' % (time() / 1000.0)) # builds Statechart structure def build_statechart_structure(self): # state self.states[""] = State(0, "", self) # state /running self.states["/running"] = State(1, "/running", self) self.states["/running"].setEnter(self._running_enter) self.states["/running"].setExit(self._running_exit) # state /interrupted self.states["/interrupted"] = State(2, "/interrupted", self) # add children self.states[""].addChild(self.states["/running"]) self.states[""].addChild(self.states["/interrupted"]) self.states[""].fixTree() self.states[""].default_state = self.states["/running"] # transition /running _running_0 = Transition(self, self.states["/running"], [self.states["/running"]]) _running_0.setAction(self._running_0_exec) _running_0.setTrigger(Event("_0after")) self.states["/running"].addTransition(_running_0) _running_1 = Transition(self, self.states["/running"], [self.states["/interrupted"]]) _running_1.setAction(self._running_1_exec) _running_1.setTrigger(Event("interrupt_clicked", self.getInPortName("ui"))) self.states["/running"].addTransition(_running_1) # transition /interrupted _interrupted_0 = Transition(self, self.states["/interrupted"], [self.states["/interrupted"]]) _interrupted_0.setAction(self._interrupted_0_exec) _interrupted_0.setTrigger(Event("interrupt_clicked", self.getInPortName("ui"))) self.states["/interrupted"].addTransition(_interrupted_0) _interrupted_1 = Transition(self, self.states["/interrupted"], [self.states["/running"]]) _interrupted_1.setAction(self._interrupted_1_exec) _interrupted_1.setTrigger(Event("continue_clicked", self.getInPortName("ui"))) self.states["/interrupted"].addTransition(_interrupted_1) def _running_enter(self): self.addTimer(0, 0.05) def _running_exit(self): self.removeTimer(0) def _running_0_exec(self, parameters): self.update_timers() def _running_1_exec(self, parameters): self.update_timers() def _interrupted_0_exec(self, parameters): self.update_timers() def _interrupted_1_exec(self, parameters): self.update_timers() def initializeStatechart(self): # enter default state self.default_targets = self.states["/running"].getEffectiveTargetStates() RuntimeClassBase.initializeStatechart(self) class MainApp(AtomicDEVS, ObjectManagerBase): def __init__(self, name): AtomicDEVS.__init__(self, name) ObjectManagerBase.__init__(self) self.elapsed = 0 self.name = "MainApp" self.obj_manager_out = self.addOutPort("obj_manager_out") self.outputs = {} self.obj_manager_in = self.addInPort("obj_manager_in") self.input = self.addInPort("input") self.instances.append(MainAppInstance(self)) self.next_time = INFINITY def extTransition(self, inputs): self.simulated_time = (self.simulated_time + self.elapsed) self.next_time = 0 all_inputs = [] if self.obj_manager_in in inputs: all_inputs.extend(inputs[self.obj_manager_in]) if self.input in inputs: all_inputs.extend(inputs[self.input]) for input in all_inputs: if isinstance(input, str): tem = eval(input) self.addInput(tem) elif input[2].name == "create_instance": new_instance = MainAppInstance(self) self.instances.append(new_instance) p = new_instance.associations.get("parent") if p: p.addInstance(input[2].instance) ev = Event("instance_created", None, [f"{input[2].parameters[0]}[{len(self.instances)-1}]"], input[2].instance) self.to_send.append((input[1], input[0], ev)) elif input[2].name == "start_instance": instance = self.instances[input[2].instance] instance.start() ev = Event("instance_started", None, [f"{input[0]}[{len(self.instances)-1}]"], input[2].instance) self.to_send.append((input[0], input[1], ev)) elif input[2].name == "delete_instance": pass elif input[2].name == "associate_instance": pass elif input[2].name == "disassociate_instance": pass elif input[2].name == "instance_created": instance = self.instances[input[2].instance] instance.addEvent(input[2]) elif input[2].name == "instance_started": instance = self.instances[input[2].instance] instance.addEvent(input[2]) elif input[2].name == "instance_deleted": pass elif input[2].name == "instance_associated": pass elif input[2].name == "instance_disassociated": pass else: ev = input[2] self.addInput(ev) return self.instances def intTransition(self): earliest = min(self.getEarliestEventTime(), self.simulated_time + self.input_queue.getEarliestTime()) if not (earliest == INFINITY): self.simulated_time = earliest self.to_send = [] self.handleInput() self.stepAll() next_earliest = min(self.getEarliestEventTime(), self.input_queue.getEarliestTime()) if not (len(self.to_send) == 0): self.next_time = 0 elif next_earliest == INFINITY: self.next_time = INFINITY else: self.next_time = next_earliest - earliest return self.instances def outputFnc(self): to_dict = {} for sending in self.to_send: if sending[2].port == None: if self.obj_manager_out in to_dict: to_dict[self.obj_manager_out].append(sending) else: to_dict[self.obj_manager_out] = [sending] else: the_port = None for port in self.OPorts: if port.name == sending[0]: the_port = port if the_port in to_dict: to_dict[the_port].append(sending) else: to_dict[the_port] = [sending] return to_dict def timeAdvance(self): return self.next_time class ObjectManagerState: def __init__(self): self.to_send = [(None, "MainApp", Event("start_instance", None, [0], 0))] class ObjectManager(AtomicDEVS): def __init__(self, name): AtomicDEVS.__init__(self, name) self.State = ObjectManagerState() self.input = self.addInPort("input") self.output = {} self.output["MainApp"] = self.addOutPort() def extTransition(self, inputs): all_inputs = inputs[self.input] for input in all_inputs: self.State.to_send.append(input) return self.State def intTransition(self): self.State.to_send = [] return self.State def outputFnc(self): out_dict = {} for (source, target, message) in self.State.to_send: if self.output[target] in out_dict: out_dict[self.output[target]].append((source, target, message)) else: out_dict[self.output[target]] = [(source, target, message)] return out_dict def timeAdvance(self): if self.State.to_send: return 0 return INFINITY class Controller(CoupledDEVS): def __init__(self, name): CoupledDEVS.__init__(self, name) self.ui = self.addInPort("ui") self.objectmanager = self.addSubModel(ObjectManager("ObjectManager")) self.atomic0 = self.addSubModel(MainApp("MainApp")) self.connectPorts(self.atomic0.obj_manager_out, self.objectmanager.input) self.connectPorts(self.objectmanager.output["MainApp"], self.atomic0.obj_manager_in)