| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- """
- Generated by Statechart compiler by Glenn De Jonghe, Joeri Exelmans, Simon Van Mierlo, and Yentl Van Tendeloo (for the inspiration) and Sam Pieters (DEVS)
- Model author: Simon Van Mierlo
- Model name: Timer (Eventloop Version)
- """
- from sccd.runtime.DEVS_statecharts_core import *
- from pypdevs.DEVS import *
- from pypdevs.infinity import *
- from pypdevs.simulator import *
- from sccd.runtime.libs.ui import ui
- from time import time
- CANVAS_WIDTH = 500
- CANVAS_HEIGHT = 250
- FONT_SIZE = 50
- # package "Timer (Eventloop Version)"
- class MainAppInstance(RuntimeClassBase):
- def __init__(self, atomdevs):
- RuntimeClassBase.__init__(self, atomdevs)
- self.associations = {}
-
- self.semantics.big_step_maximality = StatechartSemantics.TakeMany
- self.semantics.internal_event_lifeline = StatechartSemantics.Queue
- self.semantics.input_event_lifeline = StatechartSemantics.FirstComboStep
- self.semantics.priority = StatechartSemantics.SourceParent
- self.semantics.concurrency = StatechartSemantics.Single
-
- # build Statechart structure
- self.build_statechart_structure()
-
- # call user defined constructor
- MainAppInstance.user_defined_constructor(self)
-
- def user_defined_constructor(self):
- self.canvas = ui.append_canvas(ui.window,CANVAS_WIDTH,CANVAS_HEIGHT,{'background':'#222222'})
- self.clock_text = self.canvas.element.create_text(
- CANVAS_WIDTH / 2,
- CANVAS_HEIGHT / 2,
- text='0.0',
- anchor='center',
- font=("TkDefaultFont", FONT_SIZE)
- )
- self.actual_clock_text = self.canvas.element.create_text(
- CANVAS_WIDTH / 2,
- (CANVAS_HEIGHT / 2) + FONT_SIZE,
- text='0.0',
- anchor='center',
- font=("TkDefaultFont", FONT_SIZE)
- )
- interrupt_button = ui.append_button(ui.window, 'INTERRUPT');
- continue_button = ui.append_button(ui.window, 'CONTINUE');
- #ui.bind_event(interrupt_button.element, ui.EVENTS.MOUSE_CLICK, self.controller, 'interrupt_clicked');
- #ui.bind_event(continue_button.element, ui.EVENTS.MOUSE_CLICK, self.controller, 'continue_clicked');
-
- def user_defined_destructor(self):
- pass
-
-
- # user defined method
- def update_timers(self):
- self.canvas.element.itemconfigure(self.clock_text, text=str('%.2f' % (self.getSimulatedTime() / 1000.0)))
- self.canvas.element.itemconfigure(self.actual_clock_text, text='%.2f' % (time() / 1000.0))
-
-
- # builds Statechart structure
- def build_statechart_structure(self):
-
- # state <root>
- self.states[""] = State(0, "", self)
-
- # state /running
- self.states["/running"] = State(1, "/running", self)
- self.states["/running"].setEnter(self._running_enter)
- self.states["/running"].setExit(self._running_exit)
-
- # state /interrupted
- self.states["/interrupted"] = State(2, "/interrupted", self)
-
- # add children
- self.states[""].addChild(self.states["/running"])
- self.states[""].addChild(self.states["/interrupted"])
- self.states[""].fixTree()
- self.states[""].default_state = self.states["/running"]
-
- # transition /running
- _running_0 = Transition(self, self.states["/running"], [self.states["/running"]])
- _running_0.setAction(self._running_0_exec)
- _running_0.setTrigger(Event("_0after"))
- self.states["/running"].addTransition(_running_0)
- _running_1 = Transition(self, self.states["/running"], [self.states["/interrupted"]])
- _running_1.setAction(self._running_1_exec)
- _running_1.setTrigger(Event("interrupt_clicked", self.getInPortName("ui")))
- self.states["/running"].addTransition(_running_1)
-
- # transition /interrupted
- _interrupted_0 = Transition(self, self.states["/interrupted"], [self.states["/interrupted"]])
- _interrupted_0.setAction(self._interrupted_0_exec)
- _interrupted_0.setTrigger(Event("interrupt_clicked", self.getInPortName("ui")))
- self.states["/interrupted"].addTransition(_interrupted_0)
- _interrupted_1 = Transition(self, self.states["/interrupted"], [self.states["/running"]])
- _interrupted_1.setAction(self._interrupted_1_exec)
- _interrupted_1.setTrigger(Event("continue_clicked", self.getInPortName("ui")))
- self.states["/interrupted"].addTransition(_interrupted_1)
-
- def _running_enter(self):
- self.addTimer(0, 0.05)
-
- def _running_exit(self):
- self.removeTimer(0)
-
- def _running_0_exec(self, parameters):
- self.update_timers()
-
- def _running_1_exec(self, parameters):
- self.update_timers()
-
- def _interrupted_0_exec(self, parameters):
- self.update_timers()
-
- def _interrupted_1_exec(self, parameters):
- self.update_timers()
-
- def initializeStatechart(self):
- # enter default state
- self.default_targets = self.states["/running"].getEffectiveTargetStates()
- RuntimeClassBase.initializeStatechart(self)
- class MainApp(AtomicDEVS, ObjectManagerBase):
- def __init__(self, name):
- AtomicDEVS.__init__(self, name)
- ObjectManagerBase.__init__(self)
- self.elapsed = 0
- self.name = "MainApp"
- self.obj_manager_out = self.addOutPort("obj_manager_out")
- self.outputs = {}
- self.obj_manager_in = self.addInPort("obj_manager_in")
- self.input = self.addInPort("input")
- self.instances.append(MainAppInstance(self))
- self.next_time = INFINITY
-
- def extTransition(self, inputs):
- self.simulated_time = (self.simulated_time + self.elapsed)
- self.next_time = 0
- all_inputs = []
- if self.obj_manager_in in inputs:
- all_inputs.extend(inputs[self.obj_manager_in])
- if self.input in inputs:
- all_inputs.extend(inputs[self.input])
- for input in all_inputs:
- if isinstance(input, str):
- tem = eval(input)
- self.addInput(tem)
- elif input[2].name == "create_instance":
- new_instance = MainAppInstance(self)
- self.instances.append(new_instance)
- p = new_instance.associations.get("parent")
- if p:
- p.addInstance(input[2].instance)
- ev = Event("instance_created", None, [f"{input[2].parameters[0]}[{len(self.instances)-1}]"], input[2].instance)
- self.to_send.append((input[1], input[0], ev))
- elif input[2].name == "start_instance":
- instance = self.instances[input[2].instance]
- instance.start()
- ev = Event("instance_started", None, [f"{input[0]}[{len(self.instances)-1}]"], input[2].instance)
- self.to_send.append((input[0], input[1], ev))
- elif input[2].name == "delete_instance":
- pass
- elif input[2].name == "associate_instance":
- pass
- elif input[2].name == "disassociate_instance":
- pass
- elif input[2].name == "instance_created":
- instance = self.instances[input[2].instance]
- instance.addEvent(input[2])
- elif input[2].name == "instance_started":
- instance = self.instances[input[2].instance]
- instance.addEvent(input[2])
- elif input[2].name == "instance_deleted":
- pass
- elif input[2].name == "instance_associated":
- pass
- elif input[2].name == "instance_disassociated":
- pass
- else:
- ev = input[2]
- self.addInput(ev)
- return self.instances
-
- def intTransition(self):
- earliest = min(self.getEarliestEventTime(), self.simulated_time + self.input_queue.getEarliestTime())
- if not (earliest == INFINITY):
- self.simulated_time = earliest
- self.to_send = []
- self.handleInput()
- self.stepAll()
- next_earliest = min(self.getEarliestEventTime(), self.input_queue.getEarliestTime())
- if not (len(self.to_send) == 0):
- self.next_time = 0
- elif next_earliest == INFINITY:
- self.next_time = INFINITY
- else:
- self.next_time = next_earliest - earliest
- return self.instances
-
- def outputFnc(self):
- to_dict = {}
- for sending in self.to_send:
- if sending[2].port == None:
- if self.obj_manager_out in to_dict:
- to_dict[self.obj_manager_out].append(sending)
- else:
- to_dict[self.obj_manager_out] = [sending]
- else:
- the_port = None
- for port in self.OPorts:
- if port.name == sending[0]:
- the_port = port
- if the_port in to_dict:
- to_dict[the_port].append(sending)
- else:
- to_dict[the_port] = [sending]
- return to_dict
-
- def timeAdvance(self):
- return self.next_time
- class ObjectManagerState:
- def __init__(self):
- self.to_send = [(None, "MainApp", Event("start_instance", None, [0], 0))]
- class ObjectManager(AtomicDEVS):
- def __init__(self, name):
- AtomicDEVS.__init__(self, name)
- self.State = ObjectManagerState()
- self.input = self.addInPort("input")
- self.output = {}
- self.output["MainApp"] = self.addOutPort()
-
- def extTransition(self, inputs):
- all_inputs = inputs[self.input]
- for input in all_inputs:
- self.State.to_send.append(input)
- return self.State
-
- def intTransition(self):
- self.State.to_send = []
- return self.State
-
- def outputFnc(self):
- out_dict = {}
- for (source, target, message) in self.State.to_send:
- if self.output[target] in out_dict:
- out_dict[self.output[target]].append((source, target, message))
- else:
- out_dict[self.output[target]] = [(source, target, message)]
- return out_dict
-
- def timeAdvance(self):
- if self.State.to_send:
- return 0
- return INFINITY
- class Controller(CoupledDEVS):
- def __init__(self, name):
- CoupledDEVS.__init__(self, name)
- self.ui = self.addInPort("ui")
- self.objectmanager = self.addSubModel(ObjectManager("ObjectManager"))
- self.atomic0 = self.addSubModel(MainApp("MainApp"))
- self.connectPorts(self.atomic0.obj_manager_out, self.objectmanager.input)
- self.connectPorts(self.objectmanager.output["MainApp"], self.atomic0.obj_manager_in)
|