""" Generated by Statechart compiler by Glenn De Jonghe, Joeri Exelmans, Simon Van Mierlo, and Yentl Van Tendeloo (for the inspiration) Date: Mon Oct 02 17:36:19 2017 Model author: Yentl Van Tendeloo Model name: DEVS simulator Model description: A restricted PythonPDEVS simulator modelled in SCCD """ from sccd.runtime.statecharts_core import * try: import cPickle as pickle except ImportError: import pickle as pickle import time # We basically just interface with the basesimulator from scheduler import Scheduler from DEVS import directConnect, CoupledDEVS, AtomicDEVS, RootDEVS class Breakpoint(object): def __init__(self, breakpoint_id, function, enabled, disable_on_trigger): self.id = breakpoint_id self.function = function self.enabled = enabled self.disable_on_trigger = disable_on_trigger # package "DEVS simulator" class SCCDSimulator(RuntimeClassBase): def __init__(self, controller, model): RuntimeClassBase.__init__(self, controller) self.semantics.big_step_maximality = StatechartSemantics.TakeMany self.semantics.internal_event_lifeline = StatechartSemantics.Queue self.semantics.input_event_lifeline = StatechartSemantics.FirstComboStep self.semantics.priority = StatechartSemantics.SourceParent self.semantics.concurrency = StatechartSemantics.Single # build Statechart structure self.build_statechart_structure() # call user defined constructor SCCDSimulator.user_defined_constructor(self, model) def user_defined_constructor(self, model): # Simulation variables self.termination_time = None self.termination_condition = None self.simulation_time = (0.0, 0) self.model = model self.root_model = model self.realtime_scale = 1.0 # Values to be set during simulation self.realtime_starttime = None self.inject_queue = [] # Model initialization self.model_ids = [] self.model.finalize(name="", model_counter=0, model_ids=self.model_ids, locations={None: []}, selectHierarchy=[]) # Direct connection if isinstance(self.model, CoupledDEVS): self.model.componentSet = directConnect(self.model.componentSet, True) self.model = RootDEVS(self.model.componentSet, self.model.componentSet, None) for m in self.model.componentSet: m.timeLast = (-m.elapsed, 1) ta = m.timeAdvance() m.timeNext = (m.timeLast[0] + ta, 1) m.old_states = [(m.timeLast, pickle.dumps(m.state))] elif isinstance(self.model, AtomicDEVS): for p in self.model.IPorts: p.routingInLine = [] p.routingOutLine = [] for p in self.model.OPorts: p.routingInLine = [] p.routingOutLine = [] self.model = RootDEVS([self.model], [self.model], None) self.model.timeLast = (-self.model.elapsed, 1) ta = self.model.timeAdvance() self.model.timeNext = (self.model.timeLast[0] + ta, 1) # Fixed configuration options self.model.scheduler = Scheduler(self.model.componentSet, 1e-6, len(self.model.models)) self.timeNext = self.model.scheduler.readFirst() # Cached values self.imminents = None self.outbags = None self.inbags = None self.transitioning = None self.new_states = None self.new_tn = None # Verbose trace file self.trace_file = None # Breakpoint management self.breakpoints = [] # For a reset self.save_model = {model: (model.elapsed, pickle.dumps(model.state, pickle.HIGHEST_PROTOCOL)) for model in self.model.componentSet} self.transition_times = [] def user_defined_destructor(self): pass # user defined method def serialize(self, type, object): if type == "imminents": return [m.getModelFullName() for m in object] elif type == "inbags" or type == "outbags": return {m.getPortFullName(): object[m] for m in object} elif type == "new_tn" or type == "new_states": return {m.getModelFullName(): object[m] for m in object} elif type == "transitioning": return {m.getModelFullName(): {1: "INTERNAL", 2: "EXTERNAL", 3: "CONFLUENT"}[object[m]] for m in object} # user defined method def find_port_with_name(self, name): for model in self.model.componentSet: if name.startswith(model.getModelFullName()): # Found a potential model # We can't simply check for equality, as portnames might contain dots too for port in model.IPorts: if port.getPortFullName() == name: # Now everything matches return port # Nothing found return None # user defined method def find_model_with_name(self, name): for model in self.model.componentSet: if name == model.getModelFullName(): # Found exact match return model return None # user defined method def calculate_after(self): try: # Process in parts of 100 milliseconds to repeatedly check the termination condition nexttime = (self.timeNext[0] - (time.time() - self.realtime_starttime) / self.realtime_scale) * self.realtime_scale x = min(0.1, nexttime) return x except (TypeError, AttributeError): # We are probably not simulating in realtime... return float('inf') # user defined method def parse_options(self, configuration): self.termination_condition = None if "termination_condition" not in configuration else configuration["termination_condition"] self.termination_time = None if "termination_time" not in configuration else configuration["termination_time"] self.realtime_scale = 1.0 if "realtime_scale" not in configuration else 1.0/configuration["realtime_scale"] # Subtract the current simulation time to allow for pausing self.realtime_starttime = (time.time() - self.simulation_time[0]*self.realtime_scale) # Reset the time used in the waiting, as it might not get recomputed self.the_time = 0.00001 # user defined method def should_terminate(self, realtime): # Now that it includes breakpoints, results are to be interpretted as follows: # -2 --> continue simulation # -1 --> termination condition # else --> breakpoint if realtime: check_time = self.simulation_time else: self.compute_timeNext() check_time = self.timeNext # Just access the 'transitioned' dictionary # Kind of dirty though... if self.transitioning is None: transitioned = set() else: transitioned = set(self.transitioning.keys()) if check_time[0] == float('inf'): # Always terminate if we have reached infinity terminate = True elif self.termination_condition is not None: terminate = self.termination_condition(check_time, self.root_model, transitioned) else: terminate = self.termination_time < check_time[0] if terminate: # Always terminate, so don't check breakpoints return -1 else: # Have to check breakpoints for termination for bp in self.breakpoints: if not bp.enabled: continue # Include the function in the scope exec(bp.function) # And execute it, note that the breakpoint thus has to start with "def breakpoint(" if breakpoint(check_time, self.root_model, transitioned): # Triggered! return bp.id else: # Not triggered, so continue continue return -2 # user defined method def find_internal_imminents(self): self.imminents = self.model.scheduler.getImminent(self.simulation_time) self.transition_times.append(self.simulation_time) # user defined method def compute_outputfunction(self): self.outbags = {} for model in self.imminents: self.outbags.update(model.outputFnc()) # user defined method def route_messages(self): self.inbags = {} for outport in self.outbags: payload = self.outbags[outport] for inport, z in outport.routingOutLine: if z is not None: payload = [z(pickle.loads(pickle.dumps(m))) for m in payload] self.inbags.setdefault(inport, []).extend(payload) # user defined method def process_injects(self): while self.inject_queue: if self.inject_queue[0]["time"] > self.simulation_time: break config = self.inject_queue.pop(0) portname = config["port"] event = config["event"] port = self.find_port_with_name(portname) self.inbags.setdefault(port, []).append(event) # user defined method def find_all_imminents(self): # Internal codes: # 1 --> internal transition # 2 --> external transition # 3 --> confluent transition # These codes are a legacy of efficient PyPDEVS, but is kept here for consistency self.transitioning = {model: 1 for model in self.imminents} for inport in self.inbags: aDEVS = inport.hostDEVS aDEVS.myInput[inport] = self.inbags[inport] if aDEVS in self.transitioning: self.transitioning[aDEVS] = 3 else: self.transitioning[aDEVS] = 2 # user defined method def compute_transitions(self): self.new_states = {} for aDEVS in self.transitioning: aDEVS.myInput = {key: pickle.loads(pickle.dumps(aDEVS.myInput[key], pickle.HIGHEST_PROTOCOL)) for key in aDEVS.myInput} if self.transitioning[aDEVS] == 1: aDEVS.state = aDEVS.intTransition() elif self.transitioning[aDEVS] == 2: aDEVS.elapsed = self.simulation_time[0] - aDEVS.timeLast[0] aDEVS.state = aDEVS.extTransition(aDEVS.myInput) elif self.transitioning[aDEVS] == 3: aDEVS.elapsed = 0. aDEVS.state = aDEVS.confTransition(aDEVS.myInput) aDEVS.old_states.append((self.simulation_time, pickle.dumps(aDEVS.state))) aDEVS.myInput = {} self.new_states[aDEVS] = aDEVS.state # user defined method def compute_ta(self): self.new_tn = {} t, age = self.simulation_time for aDEVS in self.transitioning: ta = aDEVS.timeAdvance() aDEVS.timeLast = self.simulation_time aDEVS.timeNext = (t + ta, 1 if ta else (age + 1)) self.new_tn[aDEVS] = aDEVS.timeNext self.trace(self.transitioning[aDEVS], aDEVS) self.model.scheduler.massReschedule(self.transitioning) self.timeNext = self.model.scheduler.readFirst() # user defined method def trace(self, type, model): type_map = {1: "INTERNAL", 2: "EXTERNAL", 3: "CONFLUENT"} type = type_map[type] if self.trace_file is not None: self.trace_file.write("%s TRANSITION in <%s> @ %s\n" % (type, model.getModelFullName(), model.timeLast[0])) self.trace_file.write(" NEW STATE <%s>\n" % (model.state)) if type != "EXTERNAL": self.trace_file.write(" OUTPUTFNC returned %s\n" % model.myOutput) elif type != "INTERNAL": self.trace_file.write(" inputs were %s\n" % model.myInput) self.trace_file.write(" timeNext: %s (ta: %s)\n" % (model.timeNext[0], model.timeNext[0] - model.timeLast[0])) # user defined method def flush_file(self): if self.trace_file is not None: self.trace_file.flush() # user defined method def process_breakpoints(self, realtime): breakpoint_id = self.should_terminate(realtime) for breakpoint in self.breakpoints: if breakpoint.id == breakpoint_id: if breakpoint.disable_on_trigger: breakpoint.enabled = False return breakpoint_id # user defined method def compute_timeNext(self): model_timeNext = self.model.scheduler.readFirst() if len(self.inject_queue) > 0: self.timeNext = min(model_timeNext, self.inject_queue[0]["time"]) else: self.timeNext = model_timeNext # user defined method def rollback_step(self): if len(self.transition_times) == 0: return new_time = self.transition_times.pop() for model in self.model.componentSet: if model.old_states[-1][0] == new_time: # Remove the current state del model.old_states[-1] # Set the new (old...) state new_state = model.old_states[-1] model.state = pickle.loads(new_state[1]) model.timeLast = new_state[0] ta = model.timeAdvance() model.timeNext = (model.timeLast[0] + ta, model.timeLast[1] + 1 if ta == 0 else 0) self.model.scheduler.massReschedule([model]) self.simulation_time = self.transition_times[-1] if len(self.transition_times) > 0 else (0.0, 0) # builds Statechart structure def build_statechart_structure(self): # state self.states[""] = State(0, "", self) # state /main self.states["/main"] = ParallelState(1, "/main", self) # state /main/injection_monitor self.states["/main/injection_monitor"] = State(2, "/main/injection_monitor", self) # state /main/injection_monitor/inject self.states["/main/injection_monitor/inject"] = State(3, "/main/injection_monitor/inject", self) # state /main/tracer_monitor self.states["/main/tracer_monitor"] = State(4, "/main/tracer_monitor", self) # state /main/tracer_monitor/trace self.states["/main/tracer_monitor/trace"] = State(5, "/main/tracer_monitor/trace", self) # state /main/simulation_flow self.states["/main/simulation_flow"] = State(6, "/main/simulation_flow", self) # state /main/simulation_flow/initialize self.states["/main/simulation_flow/initialize"] = State(7, "/main/simulation_flow/initialize", self) # state /main/simulation_flow/check_termination self.states["/main/simulation_flow/check_termination"] = State(8, "/main/simulation_flow/check_termination", self) self.states["/main/simulation_flow/check_termination"].setExit(self._main_simulation_flow_check_termination_exit) # state /main/simulation_flow/check_termination/workaround self.states["/main/simulation_flow/check_termination/workaround"] = State(9, "/main/simulation_flow/check_termination/workaround", self) self.states["/main/simulation_flow/check_termination/workaround"].setEnter(self._main_simulation_flow_check_termination_workaround_enter) self.states["/main/simulation_flow/check_termination/workaround"].setExit(self._main_simulation_flow_check_termination_workaround_exit) # state /main/simulation_flow/check_termination/wait self.states["/main/simulation_flow/check_termination/wait"] = State(10, "/main/simulation_flow/check_termination/wait", self) self.states["/main/simulation_flow/check_termination/wait"].setEnter(self._main_simulation_flow_check_termination_wait_enter) self.states["/main/simulation_flow/check_termination/wait"].setExit(self._main_simulation_flow_check_termination_wait_exit) # state /main/simulation_flow/check_termination/small_step_check self.states["/main/simulation_flow/check_termination/small_step_check"] = State(11, "/main/simulation_flow/check_termination/small_step_check", self) # state /main/simulation_flow/check_termination/check_termination self.states["/main/simulation_flow/check_termination/check_termination"] = State(12, "/main/simulation_flow/check_termination/check_termination", self) self.states["/main/simulation_flow/check_termination/check_termination"].setEnter(self._main_simulation_flow_check_termination_check_termination_enter) # state /main/simulation_flow/do_simulation self.states["/main/simulation_flow/do_simulation"] = State(13, "/main/simulation_flow/do_simulation", self) # state /main/simulation_flow/do_simulation/init self.states["/main/simulation_flow/do_simulation/init"] = State(14, "/main/simulation_flow/do_simulation/init", self) self.states["/main/simulation_flow/do_simulation/init"].setExit(self._main_simulation_flow_do_simulation_init_exit) # state /main/simulation_flow/do_simulation/found_internal_imminents self.states["/main/simulation_flow/do_simulation/found_internal_imminents"] = State(15, "/main/simulation_flow/do_simulation/found_internal_imminents", self) self.states["/main/simulation_flow/do_simulation/found_internal_imminents"].setExit(self._main_simulation_flow_do_simulation_found_internal_imminents_exit) # state /main/simulation_flow/do_simulation/computed_outputfunction self.states["/main/simulation_flow/do_simulation/computed_outputfunction"] = State(16, "/main/simulation_flow/do_simulation/computed_outputfunction", self) self.states["/main/simulation_flow/do_simulation/computed_outputfunction"].setExit(self._main_simulation_flow_do_simulation_computed_outputfunction_exit) # state /main/simulation_flow/do_simulation/routed_messages self.states["/main/simulation_flow/do_simulation/routed_messages"] = State(17, "/main/simulation_flow/do_simulation/routed_messages", self) self.states["/main/simulation_flow/do_simulation/routed_messages"].setExit(self._main_simulation_flow_do_simulation_routed_messages_exit) # state /main/simulation_flow/do_simulation/found_all_imminents self.states["/main/simulation_flow/do_simulation/found_all_imminents"] = State(18, "/main/simulation_flow/do_simulation/found_all_imminents", self) self.states["/main/simulation_flow/do_simulation/found_all_imminents"].setExit(self._main_simulation_flow_do_simulation_found_all_imminents_exit) # state /main/simulation_flow/do_simulation/computed_transitions self.states["/main/simulation_flow/do_simulation/computed_transitions"] = State(19, "/main/simulation_flow/do_simulation/computed_transitions", self) self.states["/main/simulation_flow/do_simulation/computed_transitions"].setExit(self._main_simulation_flow_do_simulation_computed_transitions_exit) # state /main/simulation_state self.states["/main/simulation_state"] = State(20, "/main/simulation_state", self) # state /main/simulation_state/paused self.states["/main/simulation_state/paused"] = State(21, "/main/simulation_state/paused", self) # state /main/simulation_state/continuous self.states["/main/simulation_state/continuous"] = State(22, "/main/simulation_state/continuous", self) # state /main/simulation_state/realtime self.states["/main/simulation_state/realtime"] = State(23, "/main/simulation_state/realtime", self) # state /main/simulation_state/big_step self.states["/main/simulation_state/big_step"] = State(24, "/main/simulation_state/big_step", self) # state /main/breakpoint_manager self.states["/main/breakpoint_manager"] = State(25, "/main/breakpoint_manager", self) # state /main/breakpoint_manager/breakpoint_manage self.states["/main/breakpoint_manager/breakpoint_manage"] = State(26, "/main/breakpoint_manager/breakpoint_manage", self) # state /main/reset self.states["/main/reset"] = State(27, "/main/reset", self) # state /main/reset/reset self.states["/main/reset/reset"] = State(28, "/main/reset/reset", self) # add children self.states[""].addChild(self.states["/main"]) self.states["/main"].addChild(self.states["/main/injection_monitor"]) self.states["/main"].addChild(self.states["/main/tracer_monitor"]) self.states["/main"].addChild(self.states["/main/simulation_flow"]) self.states["/main"].addChild(self.states["/main/simulation_state"]) self.states["/main"].addChild(self.states["/main/breakpoint_manager"]) self.states["/main"].addChild(self.states["/main/reset"]) self.states["/main/injection_monitor"].addChild(self.states["/main/injection_monitor/inject"]) self.states["/main/tracer_monitor"].addChild(self.states["/main/tracer_monitor/trace"]) self.states["/main/simulation_flow"].addChild(self.states["/main/simulation_flow/initialize"]) self.states["/main/simulation_flow"].addChild(self.states["/main/simulation_flow/check_termination"]) self.states["/main/simulation_flow"].addChild(self.states["/main/simulation_flow/do_simulation"]) self.states["/main/simulation_flow/check_termination"].addChild(self.states["/main/simulation_flow/check_termination/workaround"]) self.states["/main/simulation_flow/check_termination"].addChild(self.states["/main/simulation_flow/check_termination/wait"]) self.states["/main/simulation_flow/check_termination"].addChild(self.states["/main/simulation_flow/check_termination/small_step_check"]) self.states["/main/simulation_flow/check_termination"].addChild(self.states["/main/simulation_flow/check_termination/check_termination"]) self.states["/main/simulation_flow/do_simulation"].addChild(self.states["/main/simulation_flow/do_simulation/init"]) self.states["/main/simulation_flow/do_simulation"].addChild(self.states["/main/simulation_flow/do_simulation/found_internal_imminents"]) self.states["/main/simulation_flow/do_simulation"].addChild(self.states["/main/simulation_flow/do_simulation/computed_outputfunction"]) self.states["/main/simulation_flow/do_simulation"].addChild(self.states["/main/simulation_flow/do_simulation/routed_messages"]) self.states["/main/simulation_flow/do_simulation"].addChild(self.states["/main/simulation_flow/do_simulation/found_all_imminents"]) self.states["/main/simulation_flow/do_simulation"].addChild(self.states["/main/simulation_flow/do_simulation/computed_transitions"]) self.states["/main/simulation_state"].addChild(self.states["/main/simulation_state/paused"]) self.states["/main/simulation_state"].addChild(self.states["/main/simulation_state/continuous"]) self.states["/main/simulation_state"].addChild(self.states["/main/simulation_state/realtime"]) self.states["/main/simulation_state"].addChild(self.states["/main/simulation_state/big_step"]) self.states["/main/breakpoint_manager"].addChild(self.states["/main/breakpoint_manager/breakpoint_manage"]) self.states["/main/reset"].addChild(self.states["/main/reset/reset"]) self.states[""].fixTree() self.states[""].default_state = self.states["/main"] self.states["/main/injection_monitor"].default_state = self.states["/main/injection_monitor/inject"] self.states["/main/tracer_monitor"].default_state = self.states["/main/tracer_monitor/trace"] self.states["/main/simulation_flow"].default_state = self.states["/main/simulation_flow/initialize"] self.states["/main/simulation_flow/check_termination"].default_state = self.states["/main/simulation_flow/check_termination/workaround"] self.states["/main/simulation_flow/do_simulation"].default_state = self.states["/main/simulation_flow/do_simulation/init"] self.states["/main/simulation_state"].default_state = self.states["/main/simulation_state/paused"] self.states["/main/breakpoint_manager"].default_state = self.states["/main/breakpoint_manager/breakpoint_manage"] self.states["/main/reset"].default_state = self.states["/main/reset/reset"] # transition /main/injection_monitor/inject _main_injection_monitor_inject_0 = Transition(self, self.states["/main/injection_monitor/inject"], [self.states["/main/injection_monitor/inject"]]) _main_injection_monitor_inject_0.setAction(self._main_injection_monitor_inject_0_exec) _main_injection_monitor_inject_0.setTrigger(Event("inject", "request")) self.states["/main/injection_monitor/inject"].addTransition(_main_injection_monitor_inject_0) # transition /main/tracer_monitor/trace _main_tracer_monitor_trace_0 = Transition(self, self.states["/main/tracer_monitor/trace"], [self.states["/main/tracer_monitor/trace"]]) _main_tracer_monitor_trace_0.setAction(self._main_tracer_monitor_trace_0_exec) _main_tracer_monitor_trace_0.setTrigger(Event("trace", "request")) self.states["/main/tracer_monitor/trace"].addTransition(_main_tracer_monitor_trace_0) # transition /main/simulation_flow/initialize _main_simulation_flow_initialize_0 = Transition(self, self.states["/main/simulation_flow/initialize"], [self.states["/main/simulation_flow/check_termination"]]) _main_simulation_flow_initialize_0.setAction(self._main_simulation_flow_initialize_0_exec) _main_simulation_flow_initialize_0.setTrigger(None) self.states["/main/simulation_flow/initialize"].addTransition(_main_simulation_flow_initialize_0) # transition /main/simulation_flow/check_termination/workaround _main_simulation_flow_check_termination_workaround_0 = Transition(self, self.states["/main/simulation_flow/check_termination/workaround"], [self.states["/main/simulation_flow/check_termination/check_termination"]]) _main_simulation_flow_check_termination_workaround_0.setTrigger(Event("_0after")) self.states["/main/simulation_flow/check_termination/workaround"].addTransition(_main_simulation_flow_check_termination_workaround_0) # transition /main/simulation_flow/check_termination/wait _main_simulation_flow_check_termination_wait_0 = Transition(self, self.states["/main/simulation_flow/check_termination/wait"], [self.states["/main/simulation_flow/check_termination/check_termination"]]) _main_simulation_flow_check_termination_wait_0.setTrigger(Event("_1after")) self.states["/main/simulation_flow/check_termination/wait"].addTransition(_main_simulation_flow_check_termination_wait_0) _main_simulation_flow_check_termination_wait_1 = Transition(self, self.states["/main/simulation_flow/check_termination/wait"], [self.states["/main/simulation_flow/check_termination/check_termination"]]) _main_simulation_flow_check_termination_wait_1.setTrigger(None) _main_simulation_flow_check_termination_wait_1.setGuard(self._main_simulation_flow_check_termination_wait_1_guard) self.states["/main/simulation_flow/check_termination/wait"].addTransition(_main_simulation_flow_check_termination_wait_1) # transition /main/simulation_flow/check_termination/small_step_check _main_simulation_flow_check_termination_small_step_check_0 = Transition(self, self.states["/main/simulation_flow/check_termination/small_step_check"], [self.states["/main/simulation_flow/do_simulation"]]) _main_simulation_flow_check_termination_small_step_check_0.setTrigger(None) _main_simulation_flow_check_termination_small_step_check_0.setGuard(self._main_simulation_flow_check_termination_small_step_check_0_guard) self.states["/main/simulation_flow/check_termination/small_step_check"].addTransition(_main_simulation_flow_check_termination_small_step_check_0) _main_simulation_flow_check_termination_small_step_check_1 = Transition(self, self.states["/main/simulation_flow/check_termination/small_step_check"], [self.states["/main/simulation_flow/check_termination/check_termination"]]) _main_simulation_flow_check_termination_small_step_check_1.setAction(self._main_simulation_flow_check_termination_small_step_check_1_exec) _main_simulation_flow_check_termination_small_step_check_1.setTrigger(None) _main_simulation_flow_check_termination_small_step_check_1.setGuard(self._main_simulation_flow_check_termination_small_step_check_1_guard) self.states["/main/simulation_flow/check_termination/small_step_check"].addTransition(_main_simulation_flow_check_termination_small_step_check_1) _main_simulation_flow_check_termination_small_step_check_2 = Transition(self, self.states["/main/simulation_flow/check_termination/small_step_check"], [self.states["/main/simulation_flow/check_termination/check_termination"]]) _main_simulation_flow_check_termination_small_step_check_2.setAction(self._main_simulation_flow_check_termination_small_step_check_2_exec) _main_simulation_flow_check_termination_small_step_check_2.setTrigger(None) _main_simulation_flow_check_termination_small_step_check_2.setGuard(self._main_simulation_flow_check_termination_small_step_check_2_guard) self.states["/main/simulation_flow/check_termination/small_step_check"].addTransition(_main_simulation_flow_check_termination_small_step_check_2) # transition /main/simulation_flow/check_termination/check_termination _main_simulation_flow_check_termination_check_termination_0 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/do_simulation"]]) _main_simulation_flow_check_termination_check_termination_0.setTrigger(None) _main_simulation_flow_check_termination_check_termination_0.setGuard(self._main_simulation_flow_check_termination_check_termination_0_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_0) _main_simulation_flow_check_termination_check_termination_1 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/do_simulation"]]) _main_simulation_flow_check_termination_check_termination_1.setTrigger(None) _main_simulation_flow_check_termination_check_termination_1.setGuard(self._main_simulation_flow_check_termination_check_termination_1_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_1) _main_simulation_flow_check_termination_check_termination_2 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/check_termination/wait"]]) _main_simulation_flow_check_termination_check_termination_2.setTrigger(None) _main_simulation_flow_check_termination_check_termination_2.setGuard(self._main_simulation_flow_check_termination_check_termination_2_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_2) _main_simulation_flow_check_termination_check_termination_3 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/do_simulation"]]) _main_simulation_flow_check_termination_check_termination_3.setTrigger(None) _main_simulation_flow_check_termination_check_termination_3.setGuard(self._main_simulation_flow_check_termination_check_termination_3_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_3) _main_simulation_flow_check_termination_check_termination_4 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/check_termination/small_step_check"]]) _main_simulation_flow_check_termination_check_termination_4.setAction(self._main_simulation_flow_check_termination_check_termination_4_exec) _main_simulation_flow_check_termination_check_termination_4.setTrigger(Event("small_step", "request")) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_4) _main_simulation_flow_check_termination_check_termination_5 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/check_termination/workaround"]]) _main_simulation_flow_check_termination_check_termination_5.setAction(self._main_simulation_flow_check_termination_check_termination_5_exec) _main_simulation_flow_check_termination_check_termination_5.setTrigger(None) _main_simulation_flow_check_termination_check_termination_5.setGuard(self._main_simulation_flow_check_termination_check_termination_5_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_5) _main_simulation_flow_check_termination_check_termination_6 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/check_termination/workaround"]]) _main_simulation_flow_check_termination_check_termination_6.setAction(self._main_simulation_flow_check_termination_check_termination_6_exec) _main_simulation_flow_check_termination_check_termination_6.setTrigger(None) _main_simulation_flow_check_termination_check_termination_6.setGuard(self._main_simulation_flow_check_termination_check_termination_6_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_6) _main_simulation_flow_check_termination_check_termination_7 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/check_termination/workaround"]]) _main_simulation_flow_check_termination_check_termination_7.setAction(self._main_simulation_flow_check_termination_check_termination_7_exec) _main_simulation_flow_check_termination_check_termination_7.setTrigger(None) _main_simulation_flow_check_termination_check_termination_7.setGuard(self._main_simulation_flow_check_termination_check_termination_7_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_7) _main_simulation_flow_check_termination_check_termination_8 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/check_termination/workaround"]]) _main_simulation_flow_check_termination_check_termination_8.setAction(self._main_simulation_flow_check_termination_check_termination_8_exec) _main_simulation_flow_check_termination_check_termination_8.setTrigger(None) _main_simulation_flow_check_termination_check_termination_8.setGuard(self._main_simulation_flow_check_termination_check_termination_8_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_8) _main_simulation_flow_check_termination_check_termination_9 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/check_termination/workaround"]]) _main_simulation_flow_check_termination_check_termination_9.setAction(self._main_simulation_flow_check_termination_check_termination_9_exec) _main_simulation_flow_check_termination_check_termination_9.setTrigger(Event("god_event", "request")) _main_simulation_flow_check_termination_check_termination_9.setGuard(self._main_simulation_flow_check_termination_check_termination_9_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_9) _main_simulation_flow_check_termination_check_termination_10 = Transition(self, self.states["/main/simulation_flow/check_termination/check_termination"], [self.states["/main/simulation_flow/check_termination/workaround"]]) _main_simulation_flow_check_termination_check_termination_10.setAction(self._main_simulation_flow_check_termination_check_termination_10_exec) _main_simulation_flow_check_termination_check_termination_10.setTrigger(Event("backwards_step", "request")) _main_simulation_flow_check_termination_check_termination_10.setGuard(self._main_simulation_flow_check_termination_check_termination_10_guard) self.states["/main/simulation_flow/check_termination/check_termination"].addTransition(_main_simulation_flow_check_termination_check_termination_10) # transition /main/simulation_flow/do_simulation/init _main_simulation_flow_do_simulation_init_0 = Transition(self, self.states["/main/simulation_flow/do_simulation/init"], [self.states["/main/simulation_flow/do_simulation/found_internal_imminents"]]) _main_simulation_flow_do_simulation_init_0.setTrigger(None) _main_simulation_flow_do_simulation_init_0.setGuard(self._main_simulation_flow_do_simulation_init_0_guard) self.states["/main/simulation_flow/do_simulation/init"].addTransition(_main_simulation_flow_do_simulation_init_0) _main_simulation_flow_do_simulation_init_1 = Transition(self, self.states["/main/simulation_flow/do_simulation/init"], [self.states["/main/simulation_flow/do_simulation/found_internal_imminents"]]) _main_simulation_flow_do_simulation_init_1.setAction(self._main_simulation_flow_do_simulation_init_1_exec) _main_simulation_flow_do_simulation_init_1.setTrigger(None) _main_simulation_flow_do_simulation_init_1.setGuard(self._main_simulation_flow_do_simulation_init_1_guard) self.states["/main/simulation_flow/do_simulation/init"].addTransition(_main_simulation_flow_do_simulation_init_1) # transition /main/simulation_flow/do_simulation/found_internal_imminents _main_simulation_flow_do_simulation_found_internal_imminents_0 = Transition(self, self.states["/main/simulation_flow/do_simulation/found_internal_imminents"], [self.states["/main/simulation_flow/do_simulation/computed_outputfunction"]]) _main_simulation_flow_do_simulation_found_internal_imminents_0.setTrigger(None) _main_simulation_flow_do_simulation_found_internal_imminents_0.setGuard(self._main_simulation_flow_do_simulation_found_internal_imminents_0_guard) self.states["/main/simulation_flow/do_simulation/found_internal_imminents"].addTransition(_main_simulation_flow_do_simulation_found_internal_imminents_0) _main_simulation_flow_do_simulation_found_internal_imminents_1 = Transition(self, self.states["/main/simulation_flow/do_simulation/found_internal_imminents"], [self.states["/main/simulation_flow/do_simulation/computed_outputfunction"]]) _main_simulation_flow_do_simulation_found_internal_imminents_1.setAction(self._main_simulation_flow_do_simulation_found_internal_imminents_1_exec) _main_simulation_flow_do_simulation_found_internal_imminents_1.setTrigger(Event("small_step", "request")) self.states["/main/simulation_flow/do_simulation/found_internal_imminents"].addTransition(_main_simulation_flow_do_simulation_found_internal_imminents_1) # transition /main/simulation_flow/do_simulation/computed_outputfunction _main_simulation_flow_do_simulation_computed_outputfunction_0 = Transition(self, self.states["/main/simulation_flow/do_simulation/computed_outputfunction"], [self.states["/main/simulation_flow/do_simulation/routed_messages"]]) _main_simulation_flow_do_simulation_computed_outputfunction_0.setTrigger(None) _main_simulation_flow_do_simulation_computed_outputfunction_0.setGuard(self._main_simulation_flow_do_simulation_computed_outputfunction_0_guard) self.states["/main/simulation_flow/do_simulation/computed_outputfunction"].addTransition(_main_simulation_flow_do_simulation_computed_outputfunction_0) _main_simulation_flow_do_simulation_computed_outputfunction_1 = Transition(self, self.states["/main/simulation_flow/do_simulation/computed_outputfunction"], [self.states["/main/simulation_flow/do_simulation/routed_messages"]]) _main_simulation_flow_do_simulation_computed_outputfunction_1.setAction(self._main_simulation_flow_do_simulation_computed_outputfunction_1_exec) _main_simulation_flow_do_simulation_computed_outputfunction_1.setTrigger(Event("small_step", "request")) self.states["/main/simulation_flow/do_simulation/computed_outputfunction"].addTransition(_main_simulation_flow_do_simulation_computed_outputfunction_1) # transition /main/simulation_flow/do_simulation/routed_messages _main_simulation_flow_do_simulation_routed_messages_0 = Transition(self, self.states["/main/simulation_flow/do_simulation/routed_messages"], [self.states["/main/simulation_flow/do_simulation/found_all_imminents"]]) _main_simulation_flow_do_simulation_routed_messages_0.setTrigger(None) _main_simulation_flow_do_simulation_routed_messages_0.setGuard(self._main_simulation_flow_do_simulation_routed_messages_0_guard) self.states["/main/simulation_flow/do_simulation/routed_messages"].addTransition(_main_simulation_flow_do_simulation_routed_messages_0) _main_simulation_flow_do_simulation_routed_messages_1 = Transition(self, self.states["/main/simulation_flow/do_simulation/routed_messages"], [self.states["/main/simulation_flow/do_simulation/found_all_imminents"]]) _main_simulation_flow_do_simulation_routed_messages_1.setAction(self._main_simulation_flow_do_simulation_routed_messages_1_exec) _main_simulation_flow_do_simulation_routed_messages_1.setTrigger(Event("small_step", "request")) self.states["/main/simulation_flow/do_simulation/routed_messages"].addTransition(_main_simulation_flow_do_simulation_routed_messages_1) # transition /main/simulation_flow/do_simulation/found_all_imminents _main_simulation_flow_do_simulation_found_all_imminents_0 = Transition(self, self.states["/main/simulation_flow/do_simulation/found_all_imminents"], [self.states["/main/simulation_flow/do_simulation/computed_transitions"]]) _main_simulation_flow_do_simulation_found_all_imminents_0.setTrigger(None) _main_simulation_flow_do_simulation_found_all_imminents_0.setGuard(self._main_simulation_flow_do_simulation_found_all_imminents_0_guard) self.states["/main/simulation_flow/do_simulation/found_all_imminents"].addTransition(_main_simulation_flow_do_simulation_found_all_imminents_0) _main_simulation_flow_do_simulation_found_all_imminents_1 = Transition(self, self.states["/main/simulation_flow/do_simulation/found_all_imminents"], [self.states["/main/simulation_flow/do_simulation/computed_transitions"]]) _main_simulation_flow_do_simulation_found_all_imminents_1.setAction(self._main_simulation_flow_do_simulation_found_all_imminents_1_exec) _main_simulation_flow_do_simulation_found_all_imminents_1.setTrigger(Event("small_step", "request")) self.states["/main/simulation_flow/do_simulation/found_all_imminents"].addTransition(_main_simulation_flow_do_simulation_found_all_imminents_1) # transition /main/simulation_flow/do_simulation/computed_transitions _main_simulation_flow_do_simulation_computed_transitions_0 = Transition(self, self.states["/main/simulation_flow/do_simulation/computed_transitions"], [self.states["/main/simulation_flow/check_termination"]]) _main_simulation_flow_do_simulation_computed_transitions_0.setTrigger(None) _main_simulation_flow_do_simulation_computed_transitions_0.setGuard(self._main_simulation_flow_do_simulation_computed_transitions_0_guard) self.states["/main/simulation_flow/do_simulation/computed_transitions"].addTransition(_main_simulation_flow_do_simulation_computed_transitions_0) _main_simulation_flow_do_simulation_computed_transitions_1 = Transition(self, self.states["/main/simulation_flow/do_simulation/computed_transitions"], [self.states["/main/simulation_flow/check_termination"]]) _main_simulation_flow_do_simulation_computed_transitions_1.setAction(self._main_simulation_flow_do_simulation_computed_transitions_1_exec) _main_simulation_flow_do_simulation_computed_transitions_1.setTrigger(Event("small_step", "request")) self.states["/main/simulation_flow/do_simulation/computed_transitions"].addTransition(_main_simulation_flow_do_simulation_computed_transitions_1) _main_simulation_flow_do_simulation_computed_transitions_2 = Transition(self, self.states["/main/simulation_flow/do_simulation/computed_transitions"], [self.states["/main/simulation_flow/check_termination"]]) _main_simulation_flow_do_simulation_computed_transitions_2.setAction(self._main_simulation_flow_do_simulation_computed_transitions_2_exec) _main_simulation_flow_do_simulation_computed_transitions_2.setTrigger(None) _main_simulation_flow_do_simulation_computed_transitions_2.setGuard(self._main_simulation_flow_do_simulation_computed_transitions_2_guard) self.states["/main/simulation_flow/do_simulation/computed_transitions"].addTransition(_main_simulation_flow_do_simulation_computed_transitions_2) # transition /main/simulation_state/paused _main_simulation_state_paused_0 = Transition(self, self.states["/main/simulation_state/paused"], [self.states["/main/simulation_state/continuous"]]) _main_simulation_state_paused_0.setAction(self._main_simulation_state_paused_0_exec) _main_simulation_state_paused_0.setTrigger(Event("simulate", "request")) self.states["/main/simulation_state/paused"].addTransition(_main_simulation_state_paused_0) _main_simulation_state_paused_1 = Transition(self, self.states["/main/simulation_state/paused"], [self.states["/main/simulation_state/realtime"]]) _main_simulation_state_paused_1.setAction(self._main_simulation_state_paused_1_exec) _main_simulation_state_paused_1.setTrigger(Event("realtime", "request")) self.states["/main/simulation_state/paused"].addTransition(_main_simulation_state_paused_1) _main_simulation_state_paused_2 = Transition(self, self.states["/main/simulation_state/paused"], [self.states["/main/simulation_state/big_step"]]) _main_simulation_state_paused_2.setAction(self._main_simulation_state_paused_2_exec) _main_simulation_state_paused_2.setTrigger(Event("big_step", "request")) self.states["/main/simulation_state/paused"].addTransition(_main_simulation_state_paused_2) # transition /main/simulation_state/continuous _main_simulation_state_continuous_0 = Transition(self, self.states["/main/simulation_state/continuous"], [self.states["/main/simulation_state/continuous"]]) _main_simulation_state_continuous_0.setAction(self._main_simulation_state_continuous_0_exec) _main_simulation_state_continuous_0.setTrigger(Event("pause", "request")) self.states["/main/simulation_state/continuous"].addTransition(_main_simulation_state_continuous_0) _main_simulation_state_continuous_1 = Transition(self, self.states["/main/simulation_state/continuous"], [self.states["/main/simulation_state/paused"]]) _main_simulation_state_continuous_1.setAction(self._main_simulation_state_continuous_1_exec) _main_simulation_state_continuous_1.setTrigger(Event("termination_condition", None)) self.states["/main/simulation_state/continuous"].addTransition(_main_simulation_state_continuous_1) # transition /main/simulation_state/realtime _main_simulation_state_realtime_0 = Transition(self, self.states["/main/simulation_state/realtime"], [self.states["/main/simulation_state/realtime"]]) _main_simulation_state_realtime_0.setAction(self._main_simulation_state_realtime_0_exec) _main_simulation_state_realtime_0.setTrigger(Event("pause", "request")) self.states["/main/simulation_state/realtime"].addTransition(_main_simulation_state_realtime_0) _main_simulation_state_realtime_1 = Transition(self, self.states["/main/simulation_state/realtime"], [self.states["/main/simulation_state/paused"]]) _main_simulation_state_realtime_1.setAction(self._main_simulation_state_realtime_1_exec) _main_simulation_state_realtime_1.setTrigger(Event("termination_condition", None)) self.states["/main/simulation_state/realtime"].addTransition(_main_simulation_state_realtime_1) # transition /main/simulation_state/big_step _main_simulation_state_big_step_0 = Transition(self, self.states["/main/simulation_state/big_step"], [self.states["/main/simulation_state/paused"]]) _main_simulation_state_big_step_0.setTrigger(Event("big_step_done", None)) self.states["/main/simulation_state/big_step"].addTransition(_main_simulation_state_big_step_0) _main_simulation_state_big_step_1 = Transition(self, self.states["/main/simulation_state/big_step"], [self.states["/main/simulation_state/paused"]]) _main_simulation_state_big_step_1.setAction(self._main_simulation_state_big_step_1_exec) _main_simulation_state_big_step_1.setTrigger(Event("termination_condition", None)) self.states["/main/simulation_state/big_step"].addTransition(_main_simulation_state_big_step_1) # transition /main/breakpoint_manager/breakpoint_manage _main_breakpoint_manager_breakpoint_manage_0 = Transition(self, self.states["/main/breakpoint_manager/breakpoint_manage"], [self.states["/main/breakpoint_manager/breakpoint_manage"]]) _main_breakpoint_manager_breakpoint_manage_0.setAction(self._main_breakpoint_manager_breakpoint_manage_0_exec) _main_breakpoint_manager_breakpoint_manage_0.setTrigger(Event("add_breakpoint", "request")) self.states["/main/breakpoint_manager/breakpoint_manage"].addTransition(_main_breakpoint_manager_breakpoint_manage_0) _main_breakpoint_manager_breakpoint_manage_1 = Transition(self, self.states["/main/breakpoint_manager/breakpoint_manage"], [self.states["/main/breakpoint_manager/breakpoint_manage"]]) _main_breakpoint_manager_breakpoint_manage_1.setAction(self._main_breakpoint_manager_breakpoint_manage_1_exec) _main_breakpoint_manager_breakpoint_manage_1.setTrigger(Event("del_breakpoint", "request")) self.states["/main/breakpoint_manager/breakpoint_manage"].addTransition(_main_breakpoint_manager_breakpoint_manage_1) _main_breakpoint_manager_breakpoint_manage_2 = Transition(self, self.states["/main/breakpoint_manager/breakpoint_manage"], [self.states["/main/breakpoint_manager/breakpoint_manage"]]) _main_breakpoint_manager_breakpoint_manage_2.setAction(self._main_breakpoint_manager_breakpoint_manage_2_exec) _main_breakpoint_manager_breakpoint_manage_2.setTrigger(Event("toggle_breakpoint", "request")) self.states["/main/breakpoint_manager/breakpoint_manage"].addTransition(_main_breakpoint_manager_breakpoint_manage_2) # transition /main/reset/reset _main_reset_reset_0 = Transition(self, self.states["/main/reset/reset"], [self.states["/main/reset/reset"]]) _main_reset_reset_0.setAction(self._main_reset_reset_0_exec) _main_reset_reset_0.setTrigger(Event("reset", "request")) self.states["/main/reset/reset"].addTransition(_main_reset_reset_0) def _main_simulation_flow_check_termination_exit(self): self.simulation_time = self.timeNext def _main_simulation_flow_check_termination_workaround_enter(self): self.addTimer(0, 0) def _main_simulation_flow_check_termination_workaround_exit(self): self.removeTimer(0) def _main_simulation_flow_check_termination_wait_enter(self): self.addTimer(1, self.calculate_after()) def _main_simulation_flow_check_termination_wait_exit(self): self.removeTimer(1) diff = time.time() - self.realtime_starttime self.simulation_time = (diff / self.realtime_scale, 1) def _main_simulation_flow_check_termination_check_termination_enter(self): self.compute_timeNext() self.the_time = self.calculate_after() def _main_simulation_flow_do_simulation_init_exit(self): self.find_internal_imminents() def _main_simulation_flow_do_simulation_found_internal_imminents_exit(self): self.compute_outputfunction() def _main_simulation_flow_do_simulation_computed_outputfunction_exit(self): self.route_messages() self.process_injects() def _main_simulation_flow_do_simulation_routed_messages_exit(self): self.find_all_imminents() def _main_simulation_flow_do_simulation_found_all_imminents_exit(self): self.compute_transitions() def _main_simulation_flow_do_simulation_computed_transitions_exit(self): self.compute_ta() def _main_injection_monitor_inject_0_exec(self, parameters): configuration = parameters[0] configuration["time"] = (configuration["time"], 1) self.inject_queue.append(configuration) self.inject_queue.sort(key=lambda i: i["time"]) self.big_step.outputEvent(Event("inject_ok", "reply", [])) def _main_tracer_monitor_trace_0_exec(self, parameters): filename = parameters[0] if filename is not None: self.trace_file = open(filename, 'w') else: self.trace_file = None self.big_step.outputEvent(Event("trace_config_ok", "reply", [])) def _main_simulation_flow_initialize_0_exec(self, parameters): self.big_step.outputEvent(Event("all_states", "reply", [self.simulation_time, {m.getModelFullName(): (m.timeNext, m.state) for m in self.model.componentSet}])) def _main_simulation_flow_check_termination_wait_1_guard(self, parameters): return self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_check_termination_small_step_check_0_guard(self, parameters): return self.should_terminate(False) == -2 def _main_simulation_flow_check_termination_small_step_check_1_exec(self, parameters): self.raiseInternalEvent(Event("termination_condition", None, [])) def _main_simulation_flow_check_termination_small_step_check_1_guard(self, parameters): return self.should_terminate(False) == -1 def _main_simulation_flow_check_termination_small_step_check_2_exec(self, parameters): breakpoint_id = self.process_breakpoints(False) self.big_step.outputEvent(Event("breakpoint_triggered", "reply", [breakpoint_id])) self.raiseInternalEvent(Event("termination_condition", None, [])) def _main_simulation_flow_check_termination_small_step_check_2_guard(self, parameters): return self.should_terminate(False) > -1 def _main_simulation_flow_check_termination_check_termination_0_guard(self, parameters): return self.inState(["/main/simulation_state/continuous"]) and (self.should_terminate(False) == -2) def _main_simulation_flow_check_termination_check_termination_1_guard(self, parameters): return self.inState(["/main/simulation_state/big_step"]) and (self.should_terminate(False) == -2) def _main_simulation_flow_check_termination_check_termination_2_guard(self, parameters): return self.inState(["/main/simulation_state/realtime"]) and (self.should_terminate(True) == -2) and (self.the_time > 0.0) def _main_simulation_flow_check_termination_check_termination_3_guard(self, parameters): return self.inState(["/main/simulation_state/realtime"]) and (self.should_terminate(True) == -2) and (self.the_time <= 0.0) def _main_simulation_flow_check_termination_check_termination_4_exec(self, parameters): configuration = parameters[0] self.parse_options(configuration) def _main_simulation_flow_check_termination_check_termination_5_exec(self, parameters): self.raiseInternalEvent(Event("termination_condition", None, [])) def _main_simulation_flow_check_termination_check_termination_5_guard(self, parameters): return (not self.inState(["/main/simulation_state/paused"]) and self.inState(["/main/simulation_state/realtime"]) and (self.should_terminate(True) == -1)) def _main_simulation_flow_check_termination_check_termination_6_exec(self, parameters): self.raiseInternalEvent(Event("termination_condition", None, [])) def _main_simulation_flow_check_termination_check_termination_6_guard(self, parameters): return (not self.inState(["/main/simulation_state/paused"]) and not self.inState(["/main/simulation_state/realtime"]) and (self.should_terminate(False) == -1)) def _main_simulation_flow_check_termination_check_termination_7_exec(self, parameters): breakpoint_id = self.process_breakpoints(True) self.big_step.outputEvent(Event("breakpoint_triggered", "reply", [breakpoint_id])) self.raiseInternalEvent(Event("termination_condition", None, [])) def _main_simulation_flow_check_termination_check_termination_7_guard(self, parameters): return (not self.inState(["/main/simulation_state/paused"])) and self.inState(["/main/simulation_state/realtime"]) and (self.should_terminate(True) > -1) def _main_simulation_flow_check_termination_check_termination_8_exec(self, parameters): breakpoint_id = self.process_breakpoints(False) self.big_step.outputEvent(Event("breakpoint_triggered", "reply", [breakpoint_id])) self.raiseInternalEvent(Event("termination_condition", None, [])) def _main_simulation_flow_check_termination_check_termination_8_guard(self, parameters): return (not self.inState(["/main/simulation_state/paused"])) and not self.inState(["/main/simulation_state/realtime"]) and (self.should_terminate(False) > -1) def _main_simulation_flow_check_termination_check_termination_9_exec(self, parameters): configuration = parameters[0] modelname = configuration["model"] state_attribute = configuration["attribute"] new_value = configuration["value"] model = self.find_model_with_name(modelname) setattr(model.state, state_attribute, new_value) # Call the timeadvance method again and compute new ta ta = model.timeAdvance() model.timeNext = (model.timeLast[0] + ta, 1 if ta else (model.timeLast[1] + 1)) self.model.scheduler.massReschedule([model]) # self.simulation_time = self.model.scheduler.readFirst() self.big_step.outputEvent(Event("god_event_ok", "reply", [{model.getModelFullName(): str(model.state)}])) self.big_step.outputEvent(Event("new_tn", "reply", [self.simulation_time, {model.getModelFullName(): model.timeNext}])) def _main_simulation_flow_check_termination_check_termination_9_guard(self, parameters): configuration = parameters[0] return self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_check_termination_check_termination_10_exec(self, parameters): self.rollback_step() self.big_step.outputEvent(Event("all_states", "reply", [self.simulation_time, {m.getModelFullName(): (m.timeNext, m.state) for m in self.model.componentSet}])) def _main_simulation_flow_check_termination_check_termination_10_guard(self, parameters): return self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_do_simulation_init_0_guard(self, parameters): return not self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_do_simulation_init_1_exec(self, parameters): self.big_step.outputEvent(Event("imminents", "reply", [self.simulation_time, self.serialize('imminents', self.imminents)])) def _main_simulation_flow_do_simulation_init_1_guard(self, parameters): return self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_do_simulation_found_internal_imminents_0_guard(self, parameters): return not self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_do_simulation_found_internal_imminents_1_exec(self, parameters): self.big_step.outputEvent(Event("outbags", "reply", [self.simulation_time, self.serialize('outbags', self.outbags)])) def _main_simulation_flow_do_simulation_computed_outputfunction_0_guard(self, parameters): return not self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_do_simulation_computed_outputfunction_1_exec(self, parameters): self.big_step.outputEvent(Event("inbags", "reply", [self.simulation_time, self.serialize('inbags', self.inbags)])) def _main_simulation_flow_do_simulation_routed_messages_0_guard(self, parameters): return not self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_do_simulation_routed_messages_1_exec(self, parameters): self.big_step.outputEvent(Event("transitioning", "reply", [self.simulation_time, self.serialize('transitioning', self.transitioning)])) def _main_simulation_flow_do_simulation_found_all_imminents_0_guard(self, parameters): return not self.inState(["/main/simulation_state/paused"]) def _main_simulation_flow_do_simulation_found_all_imminents_1_exec(self, parameters): self.big_step.outputEvent(Event("new_states", "reply", [self.simulation_time, self.serialize('new_states', self.new_states)])) def _main_simulation_flow_do_simulation_computed_transitions_0_guard(self, parameters): return self.inState(["/main/simulation_state/continuous"]) def _main_simulation_flow_do_simulation_computed_transitions_1_exec(self, parameters): self.big_step.outputEvent(Event("new_tn", "reply", [self.simulation_time, self.serialize('new_tn', self.new_tn)])) def _main_simulation_flow_do_simulation_computed_transitions_2_exec(self, parameters): self.raiseInternalEvent(Event("big_step_done", None, [])) self.big_step.outputEvent(Event("new_states", "reply", [self.simulation_time, self.serialize('new_states', self.new_states)])) self.big_step.outputEvent(Event("new_tn", "reply", [self.simulation_time, self.serialize('new_tn', self.new_tn)])) def _main_simulation_flow_do_simulation_computed_transitions_2_guard(self, parameters): return self.inState(["/main/simulation_state/realtime"]) or self.inState(["/main/simulation_state/big_step"]) def _main_simulation_state_paused_0_exec(self, parameters): configuration = parameters[0] self.parse_options(configuration) def _main_simulation_state_paused_1_exec(self, parameters): configuration = parameters[0] self.parse_options(configuration) def _main_simulation_state_paused_2_exec(self, parameters): configuration = parameters[0] self.parse_options(configuration) def _main_simulation_state_continuous_0_exec(self, parameters): # Just override termination condition self.termination_condition = lambda i, j, k : True self.termination_time = None def _main_simulation_state_continuous_1_exec(self, parameters): self.big_step.outputEvent(Event("terminate", "reply", [self.simulation_time])) self.flush_file() self.big_step.outputEvent(Event("all_states", "reply", [self.simulation_time, {m.getModelFullName(): (m.timeNext, m.state) for m in self.model.componentSet}])) def _main_simulation_state_realtime_0_exec(self, parameters): # Just override termination condition self.termination_condition = lambda i, j, k : True self.termination_time = None # Don't forget to correctly set the simulation time diff = time.time() - self.realtime_starttime self.simulation_time = (diff / self.realtime_scale, 1) def _main_simulation_state_realtime_1_exec(self, parameters): self.big_step.outputEvent(Event("terminate", "reply", [self.simulation_time])) self.flush_file() def _main_simulation_state_big_step_1_exec(self, parameters): self.big_step.outputEvent(Event("terminate", "reply", [self.simulation_time])) self.flush_file() def _main_breakpoint_manager_breakpoint_manage_0_exec(self, parameters): breakpoint_id = parameters[0] function = parameters[1] enabled = parameters[2] disable_on_trigger = parameters[3] self.breakpoints.append(Breakpoint(breakpoint_id, function, enabled, disable_on_trigger)) def _main_breakpoint_manager_breakpoint_manage_1_exec(self, parameters): del_breakpoint_id = parameters[0] self.breakpoints = [breakpoint for breakpoint in self.breakpoints if breakpoint.id != del_breakpoint_id] def _main_breakpoint_manager_breakpoint_manage_2_exec(self, parameters): breakpoint_id = parameters[0] enabled = parameters[1] for breakpoint in self.breakpoints: if breakpoint.id == breakpoint_id: breakpoint.enabled = enabled break def _main_reset_reset_0_exec(self, parameters): for model in self.model.componentSet: model.state = pickle.loads(self.save_model[model][1]) model.elapsed = self.save_model[model][0] model.timeLast = (-model.elapsed, 1) ta = model.timeAdvance() model.timeNext = (model.timeLast[0] + ta, 1) self.simulation_time = (0.0, 0) self.model.scheduler.massReschedule(self.model.componentSet) # Reset trace file if self.trace_file is not None: self.trace_file = open(self.trace_file.name, 'w') self.big_step.outputEvent(Event("all_states", "reply", [(0.0, 0), {m.getModelFullName(): (m.timeNext, m.state) for m in self.model.componentSet}])) def initializeStatechart(self): # enter default state self.default_targets = self.states["/main"].getEffectiveTargetStates() RuntimeClassBase.initializeStatechart(self) class ObjectManager(ObjectManagerBase): def __init__(self, controller): ObjectManagerBase.__init__(self, controller) def instantiate(self, class_name, construct_params): if class_name == "SCCDSimulator": instance = SCCDSimulator(self.controller, construct_params[0]) instance.associations = {} else: raise Exception("Cannot instantiate class " + class_name) return instance class Controller(ThreadsControllerBase): def __init__(self, model, keep_running = None, behind_schedule_callback = None): if keep_running == None: keep_running = True if behind_schedule_callback == None: behind_schedule_callback = None ThreadsControllerBase.__init__(self, ObjectManager(self), keep_running, behind_schedule_callback) self.addInputPort("request") self.addOutputPort("reply") self.object_manager.createInstance("SCCDSimulator", [model])