|
- """
- Generated by Statechart compiler by Glenn De Jonghe and Joeri Exelmans
- Date: Thu Mar 17 11:16:46 2016
- Model author: Simon Van Mierlo/Yentl Van Tendeloo
- Model name: Dynamic Structure DEVS simulator
- Model description:
- A restricted PythonPDEVS simulator modelled in SCCD for classic, dynamic structure DEVS models.
- """
- from python_runtime.statecharts_core import *
- import cPickle as pickle
- import time
- import copy
- # We basically just interface with the basesimulator
- from scheduler import Scheduler
- from DEVS import directConnect, CoupledDEVS, AtomicDEVS, RootDEVS
- from classicDEVSWrapper import ClassicDEVSWrapper
- from tracer import trace
- class Breakpoint(object):
- def __init__(self, breakpoint_id, function, enabled, disable_on_trigger):
- self.id = breakpoint_id
- self.function = function
- self.enabled = enabled
- self.disable_on_trigger = disable_on_trigger
- # package "Dynamic Structure DEVS simulator"
- class SCCDSimulator(RuntimeClassBase):
- def __init__(self, controller, model):
- RuntimeClassBase.__init__(self, controller)
-
- self.semantics.big_step_maximality = StatechartSemantics.TakeMany
- self.semantics.internal_event_lifeline = StatechartSemantics.Queue
- self.semantics.input_event_lifeline = StatechartSemantics.FirstComboStep
- self.semantics.priority = StatechartSemantics.SourceParent
- self.semantics.concurrency = StatechartSemantics.Single
-
- # Call user defined constructor
- SCCDSimulator.user_defined_constructor(self, model)
-
- def user_defined_constructor(self, model):
- self.model = model
- self.save_model = pickle.dumps(self.model, pickle.HIGHEST_PROTOCOL)
- self.listeners_by_string = {}
- self.initialize_simulation()
-
- def user_defined_destructor(self):
- pass
-
-
- # User defined method
- def initialize_simulation(self):
- # Simulation variables
- self.termination_time = None
- self.termination_condition = None
- self.simulation_time = (0.0, 0)
- self.listeners = {}
- self.root_model = self.model
- self.realtime_scale = 1.0
- # Values to be set during simulation
- self.realtime_starttime = None
- self.inject_queue = []
- # Model initialization
- self.model_ids = []
- self.model.finalize(name="", model_counter=0, model_ids=self.model_ids, locations={None: []}, select_hierarchy=[])
-
-
- # User defined method
- def serialize(self, type, object):
- if type == "imminents":
- return [m.getModelFullName() for m in object]
- elif type == "imminent":
- return object.getModelFullName()
- elif type == "outbag":
- return {m.getPortFullName(): [object[m]] for m in object}
- elif type == "inbags":
- return {m.getPortFullName(): ([object[m][0].getPortFullName(), object[m][1]] if (object[m][0] is not None) else [object[m][1]]) for m in object}
- elif type == "new_tn" or type == "new_states":
- return {m.getModelFullName(): object[m] for m in object if m.model_id is not None}
- elif type == "transitioning":
- return {m.getModelFullName(): {1: "INTERNAL", 2: "EXTERNAL"}[object[m]] for m in object if m.model_id is not None}
- elif type == "transitioned":
- return {m: {1: "INTERNAL", 2: "EXTERNAL"}[object[m]] for m in object if m.model_id is not None}
-
-
- # User defined method
- def find_port_with_name(self, name):
- for model in self.model.component_set:
- if name.startswith(model.getModelFullName()):
- # Found a potential model
- # We can't simply check for equality, as portnames might contain dots too
- for port in model.IPorts:
- if port.getPortFullName() == name:
- # Now everything matches
- return port
- # Nothing found
- return None
-
-
- # User defined method
- def find_model_with_name(self, name):
- for model in self.model.component_set:
- if name == model.getModelFullName():
- # Found exact match
- return model
- return None
-
-
- # User defined method
- def calculate_after(self):
- try:
- # Process in parts of 100 milliseconds to repeatedly check the termination condition
- if self.interrupt_string:
- nexttime = 0.0
- else:
- nexttime = (self.time_next[0] - (time.time() - self.realtime_starttime) / self.realtime_scale) * self.realtime_scale
- x = min(0.1, nexttime)
- return x
- except TypeError, AttributeError:
- # We are probably not simulating in realtime...
- return float('inf')
-
-
- # User defined method
- def parse_options(self, configuration):
- self.termination_condition = None if "termination_condition" not in configuration else configuration["termination_condition"]
- self.termination_time = None if "termination_time" not in configuration else configuration["termination_time"]
- self.realtime_scale = 1.0 if "realtime_scale" not in configuration else 1.0/configuration["realtime_scale"]
- # Subtract the current simulation time to allow for pausing
- self.realtime_starttime = (time.time() - self.simulation_time[0]*self.realtime_scale)
- # Reset the time used in the waiting, as it might not get recomputed
- self.the_time = 0.00001
-
-
- # User defined method
- def should_terminate(self, realtime):
- # Now that it includes breakpoints, results are to be interpretted as follows:
- # -2 --> continue simulation
- # -1 --> termination condition
- # else --> breakpoint
- if realtime:
- check_time = self.simulation_time
- else:
- self.compute_timeNext()
- check_time = self.time_next
- # Just access the 'transitioned' dictionary
- # Kind of dirty though...
- if self.transitioning is None:
- transitioned = set()
- else:
- transitioned = self.transitioning
- outbag = self.outbag if self.outbag is not None else {}
- inbags = self.inbags if self.inbags is not None else {}
- if check_time[0] == float('inf'):
- # Always terminate if we have reached infinity
- terminate = True
- elif self.termination_condition is not None:
- terminate = self.termination_condition(check_time, self.root_model, transitioned)
- else:
- terminate = self.termination_time < check_time[0]
- if terminate:
- # Always terminate, so don't check breakpoints
- return -1
- else:
- # Have to check breakpoints for termination
- for bp in self.breakpoints:
- if not bp.enabled:
- continue
- # Include the function in the scope
- exec(bp.function)
- # And execute it, note that the breakpoint thus has to start with "def breakpoint("
- # TODO: report to the user if a breakpoint is invalid (catching exceptions).
- if breakpoint(check_time, self.root_model, self.serialize('transitioned', transitioned), outbag, inbags):
- # Triggered!
- return bp.id
- else:
- # Not triggered, so continue
- continue
- return -2
-
-
- # User defined method
- def find_internal_imminents(self):
- self.imminents = self.model.scheduler.getImminent(self.simulation_time)
- self.reschedule = set(self.imminents)
- self.transition_times.append(self.simulation_time)
- for model in self.imminents:
- model.time_next = (model.time_next[0], model.time_next[1] + 1)
-
-
- # User defined method
- def select_imminent(self):
- self.imminent = None
- if len(self.imminents) > 1:
- # Perform all selects
- self.imminents.sort()
- pending = self.imminents
- level = 1
- while len(pending) > 1:
- # Take the model each time, as we need to make sure that the selectHierarchy is valid everywhere
- model = pending[0]
- # Make a set first to remove duplicates
- colliding = list(set([m.select_hierarchy[level] for m in pending]))
- chosen = model.select_hierarchy[level-1].select(
- sorted(colliding, key=lambda i:i.getModelFullName()))
- pending = [m for m in pending
- if m.select_hierarchy[level] == chosen]
- level += 1
- self.imminent = pending[0]
- else:
- self.imminent = self.imminents[0]
- self.imminent.time_next = (self.imminent.time_next[0], self.imminent.time_next[1] - 1)
-
-
- # User defined method
- def compute_outputfunction(self):
- self.outbag = ClassicDEVSWrapper(self.imminent).outputFnc()
-
-
- # User defined method
- def route_messages(self):
- self.inbags = {}
- for outport in self.outbag:
- payload = self.outbag[outport]
- for inport, z in outport.routing_outline:
- if z is not None:
- payload = [z(pickle.loads(pickle.dumps(m))) for m in payload]
- self.inbags[inport] = [outport, payload]
-
-
- # User defined method
- def process_injects(self):
- while self.inject_queue:
- if self.inject_queue[0]["time"] > self.simulation_time:
- break
- config = self.inject_queue.pop(0)
- portname = config["port"]
- event = config["event"]
- port = self.find_port_with_name(portname)
- if port == None:
- break
- self.inbags[port] = [None, [event]]
- if self.interrupt_string:
- portname, event_value = self.interrupt_string.split(" ")
- for inport, z in self.root_model.ports[portname].routing_outline:
- ev = event_value if z is None else z(event_value)
- self.inbags[inport] = [self.root_model.ports[portname], [ev]]
- self.interrupt_string = None
-
-
- # User defined method
- def find_all_imminents(self):
- # Internal codes:
- # 1 --> internal transition
- # 2 --> external transition
- # 3 --> confluent transition
- # These codes are a legacy of efficient PyPDEVS, but is kept here for consistency
- self.transitioning = {self.imminent: 1}
- for inport in self.inbags:
- aDEVS = inport.host_DEVS
- aDEVS.my_input[inport] = self.inbags[inport][1]
- self.transitioning[aDEVS] = 2
- self.reschedule.add(aDEVS)
- self.transitioning = {ClassicDEVSWrapper(m): self.transitioning[m]
- for m in self.transitioning}
- for m in self.transitioning:
- m.server = self
- self.dc_altered = set()
-
-
- # User defined method
- def compute_transitions(self):
- self.new_states = {}
- for aDEVS in self.transitioning:
- aDEVS.my_input = {key: pickle.loads(pickle.dumps(aDEVS.my_input[key], pickle.HIGHEST_PROTOCOL)) for key in aDEVS.my_input}
- if self.transitioning[aDEVS] == 1:
- aDEVS.state = aDEVS.intTransition()
- elif self.transitioning[aDEVS] == 2:
- aDEVS.elapsed = self.simulation_time[0] - aDEVS.time_last[0]
- aDEVS.state = aDEVS.extTransition(aDEVS.my_input)
- aDEVS.old_states.append((self.simulation_time, pickle.dumps(aDEVS.state)))
- aDEVS.my_input = {}
- self.new_states[aDEVS] = aDEVS.state
-
-
- # User defined method
- def compute_ta(self):
- self.new_tn = {}
- t, age = self.simulation_time
- for aDEVS in self.transitioning:
- ta = aDEVS.timeAdvance()
- aDEVS.time_last = self.simulation_time
- aDEVS.time_next = (t + ta, 1 if ta else (age + 1))
- self.new_tn[aDEVS] = aDEVS.time_next
- trace(self.trace_file, self.transitioning[aDEVS], aDEVS)
- self.model.scheduler.massReschedule(self.reschedule)
- self.time_next = self.model.scheduler.readFirst()
-
-
- # User defined method
- def dsAddPort(self, port):
- self.dc_altered.add(port)
- self.structural_changes.setdefault('CREATED_PORTS', []).append((port.getPortFullName(), port.is_input))
-
-
- # User defined method
- def dsRemovePort(self, port):
- for iport in port.inline:
- iport.outline = [p for p in iport.outline if p != port]
- self.dsDisconnectPorts(iport, port);
- for oport in port.outline:
- oport.inline = [p for p in oport.inline if p != port]
- self.dsDisconnectPorts(port, oport);
- self.dc_altered.add(port)
- self.structural_changes.setdefault('DELETED_PORTS', []).append(port.getPortFullName())
-
-
- # User defined method
- def dsDisconnectPorts(self, p1, p2):
- self.dc_altered.add(p1)
- self.structural_changes.setdefault('DISCONNECTED_PORTS', []).append((p1.getPortFullName(), p2.getPortFullName()))
-
-
- # User defined method
- def dsConnectPorts(self, p1, p2):
- self.dc_altered.add(p1)
- self.structural_changes.setdefault('CONNECTED_PORTS', []).append((p1.getPortFullName(), p2.getPortFullName()))
-
-
- # User defined method
- def dsUnscheduleModel(self, model):
- if isinstance(model, CoupledDEVS):
- for m in model.component_set:
- self.dsUnscheduleModel(m, False)
- elif isinstance(model, AtomicDEVS):
- self.model.component_set.remove(model)
- self.model.models.remove(model)
- # The model is removed, so remove it from the scheduler
- self.model.scheduler.unschedule(model)
- self.model_ids[model.model_id] = None
- self.model.local_model_ids.remove(model.model_id)
- else:
- raise DEVSException("Unknown model to schedule: %s" % model)
- for port in model.IPorts:
- self.dsRemovePort(port)
- for port in model.OPorts:
- self.dsRemovePort(port)
- self.structural_changes.setdefault('DELETED_MODELS', []).append(model.getModelFullName())
-
-
- # User defined method
- def dsScheduleModel(self, model):
- if isinstance(model, CoupledDEVS):
- model.full_name = model.parent.full_name + "." + model.getModelName()
- for m in model.component_set:
- self.dsScheduleModel(m)
- elif isinstance(model, AtomicDEVS):
- model.model_id = len(self.model_ids)
- model.full_name = model.parent.full_name + "." + model.getModelName()
- self.model_ids.append(model)
- self.model.component_set.append(model)
- self.model.models.append(model)
- self.model.local_model_ids.add(model.model_id)
- model.time_last = (self.simulation_time[0] - model.elapsed, 1)
- ta = model.timeAdvance()
- if ta < 0:
- raise DEVSException("Negative time advance in atomic model '" + \
- model.getModelFullName() + "' with value " + \
- str(ta) + " at initialisation")
- model.time_next = (model.time_last[0] + ta, 1)
- model.old_states.append((self.simulation_time, pickle.dumps(model.state)))
- p = model.parent
- model.select_hierarchy = [model]
- while p != None:
- model.select_hierarchy = [p] + model.select_hierarchy
- p = p.parent
- if model.time_next[0] == self.simulation_time[0]:
- model.time_next = (model.time_next[0], self.simulation_time[1])
- # If scheduled for 'now', update the age manually
- # It is a new model, so add it to the scheduler too
- self.model.scheduler.schedule(model)
- else:
- raise DEVSException("Unknown model to schedule: %s" % model)
- self.structural_changes.setdefault('CREATED_MODELS', []).append(model.getModelFullName())
- self.structural_changes.setdefault('CREATED_PORTS', []).extend([(port.getPortFullName(), port.is_input) for port in model.ports.itervalues()])
-
-
- # User defined method
- def getSelfProxy(self):
- return self
-
-
- # User defined method
- def process_structure_changes(self):
- dsdevs_dict = {}
- iterlist = [aDEVS.parent for aDEVS in self.transitioning
- if aDEVS.modelTransition(dsdevs_dict)]
- # Contains all models that are already checked, to prevent duplicate checking.
- # This was not necessary for atomic models, as they are guaranteed to only be called
- # once, as they have no children to induce a structural change on them
- checked = set()
- while iterlist:
- new_iterlist = []
- for cDEVS in iterlist:
- cDEVS.server = self
- if cDEVS is None:
- # Problematic
- #assert warning("Root DEVS returned True in the modelTransition method; ignoring")
- continue
- if cDEVS in checked:
- continue
- checked.add(cDEVS)
- if cDEVS.modelTransition(dsdevs_dict):
- new_iterlist.append(cDEVS.parent)
- cDEVS.server = None
- # Don't update the iterlist while we are iterating over it
- iterlist = new_iterlist
- if self.dc_altered:
- self.model.redoDirectConnection(self.dc_altered)
- for m in self.transitioning:
- m.server = None
-
-
- # User defined method
- def flush_file(self):
- if self.trace_file is not None:
- self.trace_file.flush()
-
-
- # User defined method
- def process_breakpoints(self, realtime):
- breakpoint_id = self.should_terminate(realtime)
- for breakpoint in self.breakpoints:
- if breakpoint.id == breakpoint_id:
- if breakpoint.disable_on_trigger:
- breakpoint.enabled = False
- return breakpoint_id
-
-
- # User defined method
- def compute_timeNext(self):
- model_timeNext = self.model.scheduler.readFirst()
- if len(self.inject_queue) > 0:
- self.time_next = min(model_timeNext, self.inject_queue[0]["time"])
- else:
- self.time_next = model_timeNext
-
-
- # User defined method
- def rollback_step(self):
- if len(self.transition_times) == 0:
- return
- new_time = self.transition_times.pop()
- for model in self.model.component_set:
- if model.old_states[-1][0] == new_time:
- # Remove the current state
- del model.old_states[-1]
- # Set the new (old...) state
- new_state = model.old_states[-1]
- model.state = pickle.loads(new_state[1])
- model.time_last = new_state[0]
- ta = model.timeAdvance()
- model.time_next = (model.time_last[0] + ta, model.time_last[1] + 1 if ta == 0 else 0)
- self.model.scheduler.massReschedule([model])
- self.simulation_time = self.transition_times[-1] if len(self.transition_times) > 0 else (0.0, 0)
-
- def initializeStatechart(self):
- self.current_state[self.Root] = []
- self.current_state[self.Root_main] = []
- self.current_state[self.Root_main_injection_monitor] = []
- self.current_state[self.Root_main_tracer_monitor] = []
- self.current_state[self.Root_main_realtime_interrupt_monitor] = []
- self.current_state[self.Root_main_simulation_flow] = []
- self.current_state[self.Root_main_simulation_flow_check_termination] = []
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
- self.current_state[self.Root_main_simulation_state] = []
- self.current_state[self.Root_main_breakpoint_manager] = []
- self.current_state[self.Root_main_listeners] = []
- self.current_state[self.Root_main_reset] = []
-
- # Enter default state
- self.enter_Root_initializing()
-
- # Unique IDs for all statechart nodes
- Root = 0
- Root_main = 1
- Root_main_injection_monitor = 2
- Root_main_tracer_monitor = 3
- Root_main_realtime_interrupt_monitor = 4
- Root_main_simulation_flow = 5
- Root_main_simulation_flow_check_termination = 6
- Root_main_simulation_flow_do_simulation = 7
- Root_main_simulation_state = 8
- Root_main_breakpoint_manager = 9
- Root_main_listeners = 10
- Root_main_reset = 11
- Root_initializing = 12
- Root_main_injection_monitor_inject = 13
- Root_main_tracer_monitor_trace = 14
- Root_main_realtime_interrupt_monitor_realtime_interrupt = 15
- Root_main_simulation_flow_initialize = 16
- Root_main_simulation_flow_check_termination_workaround = 17
- Root_main_simulation_flow_check_termination_wait = 18
- Root_main_simulation_flow_check_termination_small_step_check = 19
- Root_main_simulation_flow_check_termination_check_termination = 20
- Root_main_simulation_flow_do_simulation_init = 21
- Root_main_simulation_flow_do_simulation_found_internal_imminents = 22
- Root_main_simulation_flow_do_simulation_selected_imminent = 23
- Root_main_simulation_flow_do_simulation_computed_outputfunction = 24
- Root_main_simulation_flow_do_simulation_routed_messages = 25
- Root_main_simulation_flow_do_simulation_found_all_imminents = 26
- Root_main_simulation_flow_do_simulation_computed_transitions = 27
- Root_main_simulation_flow_do_simulation_computed_ta = 28
- Root_main_simulation_state_paused = 29
- Root_main_simulation_state_continuous = 30
- Root_main_simulation_state_realtime = 31
- Root_main_simulation_state_big_step = 32
- Root_main_breakpoint_manager_breakpoint_manage = 33
- Root_main_listeners_listening = 34
- Root_main_reset_reset = 35
-
- # Statechart enter/exit action method(s)
-
- def enter_Root_main(self):
- self.current_state[self.Root].append(self.Root_main)
-
- def exit_Root_main(self):
- self.exit_Root_main_injection_monitor()
- self.exit_Root_main_tracer_monitor()
- self.exit_Root_main_realtime_interrupt_monitor()
- self.exit_Root_main_simulation_flow()
- self.exit_Root_main_simulation_state()
- self.exit_Root_main_breakpoint_manager()
- self.exit_Root_main_listeners()
- self.exit_Root_main_reset()
- self.current_state[self.Root] = []
-
- def enter_Root_main_injection_monitor(self):
- self.current_state[self.Root_main].append(self.Root_main_injection_monitor)
-
- def exit_Root_main_injection_monitor(self):
- if self.Root_main_injection_monitor_inject in self.current_state[self.Root_main_injection_monitor]:
- self.exit_Root_main_injection_monitor_inject()
- self.current_state[self.Root_main] = []
-
- def enter_Root_main_tracer_monitor(self):
- self.current_state[self.Root_main].append(self.Root_main_tracer_monitor)
-
- def exit_Root_main_tracer_monitor(self):
- if self.Root_main_tracer_monitor_trace in self.current_state[self.Root_main_tracer_monitor]:
- self.exit_Root_main_tracer_monitor_trace()
- self.current_state[self.Root_main] = []
-
- def enter_Root_main_realtime_interrupt_monitor(self):
- self.current_state[self.Root_main].append(self.Root_main_realtime_interrupt_monitor)
-
- def exit_Root_main_realtime_interrupt_monitor(self):
- if self.Root_main_realtime_interrupt_monitor_realtime_interrupt in self.current_state[self.Root_main_realtime_interrupt_monitor]:
- self.exit_Root_main_realtime_interrupt_monitor_realtime_interrupt()
- self.current_state[self.Root_main] = []
-
- def enter_Root_main_simulation_flow(self):
- self.current_state[self.Root_main].append(self.Root_main_simulation_flow)
-
- def exit_Root_main_simulation_flow(self):
- if self.Root_main_simulation_flow_initialize in self.current_state[self.Root_main_simulation_flow]:
- self.exit_Root_main_simulation_flow_initialize()
- if self.Root_main_simulation_flow_check_termination in self.current_state[self.Root_main_simulation_flow]:
- self.exit_Root_main_simulation_flow_check_termination()
- if self.Root_main_simulation_flow_do_simulation in self.current_state[self.Root_main_simulation_flow]:
- self.exit_Root_main_simulation_flow_do_simulation()
- self.current_state[self.Root_main] = []
-
- def enter_Root_main_simulation_flow_check_termination(self):
- self.current_state[self.Root_main_simulation_flow].append(self.Root_main_simulation_flow_check_termination)
-
- def exit_Root_main_simulation_flow_check_termination(self):
- if self.Root_main_simulation_flow_check_termination_workaround in self.current_state[self.Root_main_simulation_flow_check_termination]:
- self.exit_Root_main_simulation_flow_check_termination_workaround()
- if self.Root_main_simulation_flow_check_termination_wait in self.current_state[self.Root_main_simulation_flow_check_termination]:
- self.exit_Root_main_simulation_flow_check_termination_wait()
- if self.Root_main_simulation_flow_check_termination_small_step_check in self.current_state[self.Root_main_simulation_flow_check_termination]:
- self.exit_Root_main_simulation_flow_check_termination_small_step_check()
- if self.Root_main_simulation_flow_check_termination_check_termination in self.current_state[self.Root_main_simulation_flow_check_termination]:
- self.exit_Root_main_simulation_flow_check_termination_check_termination()
- self.simulation_time = self.time_next
- self.current_state[self.Root_main_simulation_flow] = []
-
- def enter_Root_main_simulation_flow_do_simulation(self):
- self.current_state[self.Root_main_simulation_flow].append(self.Root_main_simulation_flow_do_simulation)
-
- def exit_Root_main_simulation_flow_do_simulation(self):
- if self.Root_main_simulation_flow_do_simulation_init in self.current_state[self.Root_main_simulation_flow_do_simulation]:
- self.exit_Root_main_simulation_flow_do_simulation_init()
- if self.Root_main_simulation_flow_do_simulation_found_internal_imminents in self.current_state[self.Root_main_simulation_flow_do_simulation]:
- self.exit_Root_main_simulation_flow_do_simulation_found_internal_imminents()
- if self.Root_main_simulation_flow_do_simulation_selected_imminent in self.current_state[self.Root_main_simulation_flow_do_simulation]:
- self.exit_Root_main_simulation_flow_do_simulation_selected_imminent()
- if self.Root_main_simulation_flow_do_simulation_computed_outputfunction in self.current_state[self.Root_main_simulation_flow_do_simulation]:
- self.exit_Root_main_simulation_flow_do_simulation_computed_outputfunction()
- if self.Root_main_simulation_flow_do_simulation_routed_messages in self.current_state[self.Root_main_simulation_flow_do_simulation]:
- self.exit_Root_main_simulation_flow_do_simulation_routed_messages()
- if self.Root_main_simulation_flow_do_simulation_found_all_imminents in self.current_state[self.Root_main_simulation_flow_do_simulation]:
- self.exit_Root_main_simulation_flow_do_simulation_found_all_imminents()
- if self.Root_main_simulation_flow_do_simulation_computed_transitions in self.current_state[self.Root_main_simulation_flow_do_simulation]:
- self.exit_Root_main_simulation_flow_do_simulation_computed_transitions()
- if self.Root_main_simulation_flow_do_simulation_computed_ta in self.current_state[self.Root_main_simulation_flow_do_simulation]:
- self.exit_Root_main_simulation_flow_do_simulation_computed_ta()
- self.current_state[self.Root_main_simulation_flow] = []
-
- def enter_Root_main_simulation_state(self):
- self.current_state[self.Root_main].append(self.Root_main_simulation_state)
-
- def exit_Root_main_simulation_state(self):
- if self.Root_main_simulation_state_paused in self.current_state[self.Root_main_simulation_state]:
- self.exit_Root_main_simulation_state_paused()
- if self.Root_main_simulation_state_continuous in self.current_state[self.Root_main_simulation_state]:
- self.exit_Root_main_simulation_state_continuous()
- if self.Root_main_simulation_state_realtime in self.current_state[self.Root_main_simulation_state]:
- self.exit_Root_main_simulation_state_realtime()
- if self.Root_main_simulation_state_big_step in self.current_state[self.Root_main_simulation_state]:
- self.exit_Root_main_simulation_state_big_step()
- self.current_state[self.Root_main] = []
-
- def enter_Root_main_breakpoint_manager(self):
- self.current_state[self.Root_main].append(self.Root_main_breakpoint_manager)
-
- def exit_Root_main_breakpoint_manager(self):
- if self.Root_main_breakpoint_manager_breakpoint_manage in self.current_state[self.Root_main_breakpoint_manager]:
- self.exit_Root_main_breakpoint_manager_breakpoint_manage()
- self.current_state[self.Root_main] = []
-
- def enter_Root_main_listeners(self):
- self.current_state[self.Root_main].append(self.Root_main_listeners)
-
- def exit_Root_main_listeners(self):
- if self.Root_main_listeners_listening in self.current_state[self.Root_main_listeners]:
- self.exit_Root_main_listeners_listening()
- self.current_state[self.Root_main] = []
-
- def enter_Root_main_reset(self):
- self.current_state[self.Root_main].append(self.Root_main_reset)
-
- def exit_Root_main_reset(self):
- if self.Root_main_reset_reset in self.current_state[self.Root_main_reset]:
- self.exit_Root_main_reset_reset()
- self.current_state[self.Root_main] = []
-
- def enter_Root_initializing(self):
- self.current_state[self.Root].append(self.Root_initializing)
-
- def exit_Root_initializing(self):
- self.current_state[self.Root] = []
-
- def enter_Root_main_injection_monitor_inject(self):
- self.current_state[self.Root_main_injection_monitor].append(self.Root_main_injection_monitor_inject)
-
- def exit_Root_main_injection_monitor_inject(self):
- self.current_state[self.Root_main_injection_monitor] = []
-
- def enter_Root_main_tracer_monitor_trace(self):
- self.current_state[self.Root_main_tracer_monitor].append(self.Root_main_tracer_monitor_trace)
-
- def exit_Root_main_tracer_monitor_trace(self):
- self.current_state[self.Root_main_tracer_monitor] = []
-
- def enter_Root_main_realtime_interrupt_monitor_realtime_interrupt(self):
- self.current_state[self.Root_main_realtime_interrupt_monitor].append(self.Root_main_realtime_interrupt_monitor_realtime_interrupt)
-
- def exit_Root_main_realtime_interrupt_monitor_realtime_interrupt(self):
- self.current_state[self.Root_main_realtime_interrupt_monitor] = []
-
- def enter_Root_main_simulation_flow_initialize(self):
- self.structural_changes = {'CREATED_MODELS': [], 'CREATED_PORTS': [], 'CONNECTED_PORTS': []}
- queue = [self.root_model]
- while queue:
- item = queue.pop()
- self.structural_changes['CREATED_MODELS'].append(item.getModelFullName())
- self.structural_changes['CREATED_PORTS'].extend([(port.getPortFullName(), port.is_input) for port in item.ports.itervalues()])
- self.structural_changes['CONNECTED_PORTS'].extend([(outport.getPortFullName(), inport.getPortFullName()) for outport in item.OPorts for inport in outport.outline])
- if isinstance(item, CoupledDEVS):
- self.structural_changes['CONNECTED_PORTS'].extend([(inport.getPortFullName(), inport2.getPortFullName()) for inport in item.IPorts for inport2 in inport.outline])
- queue.extend(item.component_set)
- self.current_state[self.Root_main_simulation_flow].append(self.Root_main_simulation_flow_initialize)
-
- def exit_Root_main_simulation_flow_initialize(self):
- self.current_state[self.Root_main_simulation_flow] = []
-
- def enter_Root_main_simulation_flow_check_termination_workaround(self):
- self.timers[0] = 0
- self.current_state[self.Root_main_simulation_flow_check_termination].append(self.Root_main_simulation_flow_check_termination_workaround)
-
- def exit_Root_main_simulation_flow_check_termination_workaround(self):
- self.timers.pop(0, None)
- self.current_state[self.Root_main_simulation_flow_check_termination] = []
-
- def enter_Root_main_simulation_flow_check_termination_wait(self):
- self.timers[1] = self.calculate_after()
- self.current_state[self.Root_main_simulation_flow_check_termination].append(self.Root_main_simulation_flow_check_termination_wait)
-
- def exit_Root_main_simulation_flow_check_termination_wait(self):
- self.timers.pop(1, None)
- diff = time.time() - self.realtime_starttime
- self.simulation_time = (diff / self.realtime_scale, 1)
- self.current_state[self.Root_main_simulation_flow_check_termination] = []
-
- def enter_Root_main_simulation_flow_check_termination_small_step_check(self):
- self.current_state[self.Root_main_simulation_flow_check_termination].append(self.Root_main_simulation_flow_check_termination_small_step_check)
-
- def exit_Root_main_simulation_flow_check_termination_small_step_check(self):
- self.current_state[self.Root_main_simulation_flow_check_termination] = []
-
- def enter_Root_main_simulation_flow_check_termination_check_termination(self):
- self.compute_timeNext()
- self.the_time = self.calculate_after()
- self.current_state[self.Root_main_simulation_flow_check_termination].append(self.Root_main_simulation_flow_check_termination_check_termination)
-
- def exit_Root_main_simulation_flow_check_termination_check_termination(self):
- self.current_state[self.Root_main_simulation_flow_check_termination] = []
-
- def enter_Root_main_simulation_flow_do_simulation_init(self):
- self.current_state[self.Root_main_simulation_flow_do_simulation].append(self.Root_main_simulation_flow_do_simulation_init)
-
- def exit_Root_main_simulation_flow_do_simulation_init(self):
- self.find_internal_imminents()
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
-
- def enter_Root_main_simulation_flow_do_simulation_found_internal_imminents(self):
- self.current_state[self.Root_main_simulation_flow_do_simulation].append(self.Root_main_simulation_flow_do_simulation_found_internal_imminents)
-
- def exit_Root_main_simulation_flow_do_simulation_found_internal_imminents(self):
- self.select_imminent()
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
-
- def enter_Root_main_simulation_flow_do_simulation_selected_imminent(self):
- self.current_state[self.Root_main_simulation_flow_do_simulation].append(self.Root_main_simulation_flow_do_simulation_selected_imminent)
-
- def exit_Root_main_simulation_flow_do_simulation_selected_imminent(self):
- self.compute_outputfunction()
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
-
- def enter_Root_main_simulation_flow_do_simulation_computed_outputfunction(self):
- self.current_state[self.Root_main_simulation_flow_do_simulation].append(self.Root_main_simulation_flow_do_simulation_computed_outputfunction)
-
- def exit_Root_main_simulation_flow_do_simulation_computed_outputfunction(self):
- self.route_messages()
- self.process_injects()
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
-
- def enter_Root_main_simulation_flow_do_simulation_routed_messages(self):
- self.current_state[self.Root_main_simulation_flow_do_simulation].append(self.Root_main_simulation_flow_do_simulation_routed_messages)
-
- def exit_Root_main_simulation_flow_do_simulation_routed_messages(self):
- self.find_all_imminents()
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
-
- def enter_Root_main_simulation_flow_do_simulation_found_all_imminents(self):
- self.current_state[self.Root_main_simulation_flow_do_simulation].append(self.Root_main_simulation_flow_do_simulation_found_all_imminents)
-
- def exit_Root_main_simulation_flow_do_simulation_found_all_imminents(self):
- self.compute_transitions()
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
-
- def enter_Root_main_simulation_flow_do_simulation_computed_transitions(self):
- self.current_state[self.Root_main_simulation_flow_do_simulation].append(self.Root_main_simulation_flow_do_simulation_computed_transitions)
-
- def exit_Root_main_simulation_flow_do_simulation_computed_transitions(self):
- self.compute_ta()
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
-
- def enter_Root_main_simulation_flow_do_simulation_computed_ta(self):
- self.current_state[self.Root_main_simulation_flow_do_simulation].append(self.Root_main_simulation_flow_do_simulation_computed_ta)
-
- def exit_Root_main_simulation_flow_do_simulation_computed_ta(self):
- self.process_structure_changes()
- self.current_state[self.Root_main_simulation_flow_do_simulation] = []
-
- def enter_Root_main_simulation_state_paused(self):
- self.current_state[self.Root_main_simulation_state].append(self.Root_main_simulation_state_paused)
-
- def exit_Root_main_simulation_state_paused(self):
- self.current_state[self.Root_main_simulation_state] = []
-
- def enter_Root_main_simulation_state_continuous(self):
- self.listeners = {getattr(self.root_model, portname): listener for (portname, listener) in self.listeners_by_string.iteritems()}
- self.model.listeners = self.listeners
- self.model.component_set = directConnect(self.model.models, self.listeners)
- self.current_state[self.Root_main_simulation_state].append(self.Root_main_simulation_state_continuous)
-
- def exit_Root_main_simulation_state_continuous(self):
- self.current_state[self.Root_main_simulation_state] = []
-
- def enter_Root_main_simulation_state_realtime(self):
- self.listeners = {getattr(self.root_model, portname): listener for (portname, listener) in self.listeners_by_string.iteritems()}
- self.model.listeners = self.listeners
- self.model.component_set = directConnect(self.model.models, self.listeners)
- self.current_state[self.Root_main_simulation_state].append(self.Root_main_simulation_state_realtime)
-
- def exit_Root_main_simulation_state_realtime(self):
- self.current_state[self.Root_main_simulation_state] = []
-
- def enter_Root_main_simulation_state_big_step(self):
- self.listeners = {getattr(self.root_model, portname): listener for (portname, listener) in self.listeners_by_string.iteritems()}
- self.model.listeners = self.listeners
- self.model.component_set = directConnect(self.model.models, self.listeners)
- self.current_state[self.Root_main_simulation_state].append(self.Root_main_simulation_state_big_step)
-
- def exit_Root_main_simulation_state_big_step(self):
- self.current_state[self.Root_main_simulation_state] = []
-
- def enter_Root_main_breakpoint_manager_breakpoint_manage(self):
- self.current_state[self.Root_main_breakpoint_manager].append(self.Root_main_breakpoint_manager_breakpoint_manage)
-
- def exit_Root_main_breakpoint_manager_breakpoint_manage(self):
- self.current_state[self.Root_main_breakpoint_manager] = []
-
- def enter_Root_main_listeners_listening(self):
- self.current_state[self.Root_main_listeners].append(self.Root_main_listeners_listening)
-
- def exit_Root_main_listeners_listening(self):
- self.current_state[self.Root_main_listeners] = []
-
- def enter_Root_main_reset_reset(self):
- self.current_state[self.Root_main_reset].append(self.Root_main_reset_reset)
-
- def exit_Root_main_reset_reset(self):
- self.current_state[self.Root_main_reset] = []
-
- # Statechart enter/exit default method(s)
-
- def enterDefault_Root_main(self):
- self.enter_Root_main()
- self.enterDefault_Root_main_injection_monitor()
- self.enterDefault_Root_main_tracer_monitor()
- self.enterDefault_Root_main_realtime_interrupt_monitor()
- self.enterDefault_Root_main_simulation_flow()
- self.enterDefault_Root_main_simulation_state()
- self.enterDefault_Root_main_breakpoint_manager()
- self.enterDefault_Root_main_listeners()
- self.enterDefault_Root_main_reset()
-
- def enterDefault_Root_main_injection_monitor(self):
- self.enter_Root_main_injection_monitor()
- self.enter_Root_main_injection_monitor_inject()
-
- def enterDefault_Root_main_tracer_monitor(self):
- self.enter_Root_main_tracer_monitor()
- self.enter_Root_main_tracer_monitor_trace()
-
- def enterDefault_Root_main_realtime_interrupt_monitor(self):
- self.enter_Root_main_realtime_interrupt_monitor()
- self.enter_Root_main_realtime_interrupt_monitor_realtime_interrupt()
-
- def enterDefault_Root_main_simulation_flow(self):
- self.enter_Root_main_simulation_flow()
- self.enter_Root_main_simulation_flow_initialize()
-
- def enterDefault_Root_main_simulation_flow_check_termination(self):
- self.enter_Root_main_simulation_flow_check_termination()
- self.enter_Root_main_simulation_flow_check_termination_workaround()
-
- def enterDefault_Root_main_simulation_flow_do_simulation(self):
- self.enter_Root_main_simulation_flow_do_simulation()
- self.enter_Root_main_simulation_flow_do_simulation_init()
-
- def enterDefault_Root_main_simulation_state(self):
- self.enter_Root_main_simulation_state()
- self.enter_Root_main_simulation_state_paused()
-
- def enterDefault_Root_main_breakpoint_manager(self):
- self.enter_Root_main_breakpoint_manager()
- self.enter_Root_main_breakpoint_manager_breakpoint_manage()
-
- def enterDefault_Root_main_listeners(self):
- self.enter_Root_main_listeners()
- self.enter_Root_main_listeners_listening()
-
- def enterDefault_Root_main_reset(self):
- self.enter_Root_main_reset()
- self.enter_Root_main_reset_reset()
-
- # Statechart transitions
-
- def generateCandidatesChildren_Root(self):
- if self.current_state[self.Root][0] == self.Root_initializing:
- return self.generateCandidates_Root_initializing()
- elif self.current_state[self.Root][0] == self.Root_main:
- return self.generateCandidates_Root_main()
- return False
-
- def generateCandidates_Root(self):
- if not self.combo_step.isArenaChanged(self.Root):
- return self.generateCandidatesChildren_Root()
- else:
- return True
-
- def generateCandidatesCurrent_Root_initializing(self):
- enabled_events = self.getEnabledEvents()
- self.small_step.addCandidate(self.transition_Root_initializing_1, [])
- return True
- return False
-
- def generateCandidates_Root_initializing(self):
- if not self.combo_step.isArenaChanged(self.Root_initializing):
- return self.generateCandidatesCurrent_Root_initializing()
- else:
- return True
-
- def transition_Root_initializing_1(self, parameters):
- self.exit_Root_initializing()
- # Direct connection
- if isinstance(self.model, CoupledDEVS):
- self.model.component_set = directConnect(self.model.component_set, self.listeners)
- self.model = RootDEVS(self.model.component_set, self.model.component_set, None)
- self.model.listeners = self.listeners
- for m in self.model.component_set:
- m.time_last = (-m.elapsed, 1)
- ta = m.timeAdvance()
- m.time_next = (m.time_last[0] + ta, 1)
- m.old_states = [(m.time_last, pickle.dumps(m.state))]
- elif isinstance(self.model, AtomicDEVS):
- for p in self.model.IPorts:
- p.routing_inline = []
- p.routing_outline = []
- for p in self.model.OPorts:
- p.routing_inline = []
- p.routing_outline = []
- self.model = RootDEVS([self.model], [self.model], None)
- self.model.time_last = (-self.model.elapsed, 1)
- ta = self.model.timeAdvance()
- self.model.time_next = (self.model.time_last[0] + ta, 1)
- # Fixed configuration options
- self.model.scheduler = Scheduler(self.model.component_set, 1e-6, len(self.model.component_set))
- self.time_next = self.model.scheduler.readFirst()
- # Cached values
- self.imminents = None
- self.outbag = None
- self.inbags = None
- self.transitioning = None
- self.new_states = None
- self.new_tn = None
- # Verbose trace file
- self.trace_file = None
- # Breakpoint management
- self.breakpoints = []
- # For a reset
- # self.save_model = {model: (model.elapsed, pickle.dumps(model.state, pickle.HIGHEST_PROTOCOL)) for model in self.model.component_set}
- # self.save_model = pickle.dumps(self.model, pickle.HIGHEST_PROTOCOL)
- self.transition_times = []
- self.interrupt_string = None
- self.combo_step.setArenaChanged(self.Root)
- self.enterDefault_Root_main()
-
- def generateCandidatesChildren_Root_main(self):
- branch_done = False
- branch_done = (self.generateCandidates_Root_main_injection_monitor() or branch_done)
- branch_done = (self.generateCandidates_Root_main_tracer_monitor() or branch_done)
- branch_done = (self.generateCandidates_Root_main_realtime_interrupt_monitor() or branch_done)
- branch_done = (self.generateCandidates_Root_main_simulation_flow() or branch_done)
- branch_done = (self.generateCandidates_Root_main_simulation_state() or branch_done)
- branch_done = (self.generateCandidates_Root_main_breakpoint_manager() or branch_done)
- branch_done = (self.generateCandidates_Root_main_listeners() or branch_done)
- branch_done = (self.generateCandidates_Root_main_reset() or branch_done)
- return branch_done
-
- def generateCandidates_Root_main(self):
- if not self.combo_step.isArenaChanged(self.Root_main):
- return self.generateCandidatesChildren_Root_main()
- else:
- return True
-
- def generateCandidatesChildren_Root_main_injection_monitor(self):
- if self.current_state[self.Root_main_injection_monitor][0] == self.Root_main_injection_monitor_inject:
- return self.generateCandidates_Root_main_injection_monitor_inject()
- return False
-
- def generateCandidates_Root_main_injection_monitor(self):
- if not self.combo_step.isArenaChanged(self.Root_main_injection_monitor):
- return self.generateCandidatesChildren_Root_main_injection_monitor()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_injection_monitor_inject(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "inject") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_injection_monitor_inject_1, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_injection_monitor_inject(self):
- if not self.combo_step.isArenaChanged(self.Root_main_injection_monitor_inject):
- return self.generateCandidatesCurrent_Root_main_injection_monitor_inject()
- else:
- return True
-
- def transition_Root_main_injection_monitor_inject_1(self, parameters):
- configuration = parameters[0]
- self.exit_Root_main_injection_monitor_inject()
- configuration["time"] = (configuration["time"], 1)
- self.inject_queue.append(configuration)
- self.inject_queue.sort(key=lambda i: i["time"])
- self.big_step.outputEvent(Event("inject_ok", "reply", []))
- self.combo_step.setArenaChanged(self.Root_main_injection_monitor)
- self.enter_Root_main_injection_monitor_inject()
-
- def generateCandidatesChildren_Root_main_tracer_monitor(self):
- if self.current_state[self.Root_main_tracer_monitor][0] == self.Root_main_tracer_monitor_trace:
- return self.generateCandidates_Root_main_tracer_monitor_trace()
- return False
-
- def generateCandidates_Root_main_tracer_monitor(self):
- if not self.combo_step.isArenaChanged(self.Root_main_tracer_monitor):
- return self.generateCandidatesChildren_Root_main_tracer_monitor()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_tracer_monitor_trace(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "trace") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_tracer_monitor_trace_1, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_tracer_monitor_trace(self):
- if not self.combo_step.isArenaChanged(self.Root_main_tracer_monitor_trace):
- return self.generateCandidatesCurrent_Root_main_tracer_monitor_trace()
- else:
- return True
-
- def transition_Root_main_tracer_monitor_trace_1(self, parameters):
- filename = parameters[0]
- self.exit_Root_main_tracer_monitor_trace()
- if filename is not None:
- self.trace_file = open(filename, 'w')
- else:
- self.trace_file = None
- self.big_step.outputEvent(Event("trace_config_ok", "reply", []))
- self.combo_step.setArenaChanged(self.Root_main_tracer_monitor)
- self.enter_Root_main_tracer_monitor_trace()
-
- def generateCandidatesChildren_Root_main_realtime_interrupt_monitor(self):
- if self.current_state[self.Root_main_realtime_interrupt_monitor][0] == self.Root_main_realtime_interrupt_monitor_realtime_interrupt:
- return self.generateCandidates_Root_main_realtime_interrupt_monitor_realtime_interrupt()
- return False
-
- def generateCandidates_Root_main_realtime_interrupt_monitor(self):
- if not self.combo_step.isArenaChanged(self.Root_main_realtime_interrupt_monitor):
- return self.generateCandidatesChildren_Root_main_realtime_interrupt_monitor()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_realtime_interrupt_monitor_realtime_interrupt(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "realtime_interrupt") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_realtime_interrupt_monitor_realtime_interrupt_1, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_realtime_interrupt_monitor_realtime_interrupt(self):
- if not self.combo_step.isArenaChanged(self.Root_main_realtime_interrupt_monitor_realtime_interrupt):
- return self.generateCandidatesCurrent_Root_main_realtime_interrupt_monitor_realtime_interrupt()
- else:
- return True
-
- def transition_Root_main_realtime_interrupt_monitor_realtime_interrupt_1(self, parameters):
- interrupt_string = parameters[0]
- self.exit_Root_main_realtime_interrupt_monitor_realtime_interrupt()
- self.interrupt_string = interrupt_string
- self.combo_step.setArenaChanged(self.Root_main_realtime_interrupt_monitor)
- self.enter_Root_main_realtime_interrupt_monitor_realtime_interrupt()
-
- def generateCandidatesChildren_Root_main_simulation_flow(self):
- if self.current_state[self.Root_main_simulation_flow][0] == self.Root_main_simulation_flow_initialize:
- return self.generateCandidates_Root_main_simulation_flow_initialize()
- elif self.current_state[self.Root_main_simulation_flow][0] == self.Root_main_simulation_flow_check_termination:
- return self.generateCandidates_Root_main_simulation_flow_check_termination()
- elif self.current_state[self.Root_main_simulation_flow][0] == self.Root_main_simulation_flow_do_simulation:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation()
- return False
-
- def generateCandidates_Root_main_simulation_flow(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow):
- return self.generateCandidatesChildren_Root_main_simulation_flow()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_simulation_flow_initialize(self):
- enabled_events = self.getEnabledEvents()
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_initialize_1, [])
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_initialize(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_initialize):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_initialize()
- else:
- return True
-
- def transition_Root_main_simulation_flow_initialize_1(self, parameters):
- self.exit_Root_main_simulation_flow_initialize()
- self.big_step.outputEvent(Event("all_states_reset", "reply", [self.simulation_time, {m.getModelFullName(): (m.time_next, m.state) for m in self.model.component_set}, self.structural_changes, self.breakpoints]))
- self.structural_changes = {}
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow)
- self.enterDefault_Root_main_simulation_flow_check_termination()
-
- def generateCandidatesChildren_Root_main_simulation_flow_check_termination(self):
- if self.current_state[self.Root_main_simulation_flow_check_termination][0] == self.Root_main_simulation_flow_check_termination_workaround:
- return self.generateCandidates_Root_main_simulation_flow_check_termination_workaround()
- elif self.current_state[self.Root_main_simulation_flow_check_termination][0] == self.Root_main_simulation_flow_check_termination_wait:
- return self.generateCandidates_Root_main_simulation_flow_check_termination_wait()
- elif self.current_state[self.Root_main_simulation_flow_check_termination][0] == self.Root_main_simulation_flow_check_termination_small_step_check:
- return self.generateCandidates_Root_main_simulation_flow_check_termination_small_step_check()
- elif self.current_state[self.Root_main_simulation_flow_check_termination][0] == self.Root_main_simulation_flow_check_termination_check_termination:
- return self.generateCandidates_Root_main_simulation_flow_check_termination_check_termination()
- return False
-
- def generateCandidates_Root_main_simulation_flow_check_termination(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_check_termination):
- return self.generateCandidatesChildren_Root_main_simulation_flow_check_termination()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_simulation_flow_check_termination_workaround(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if e.name == "_0after":
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_workaround_1, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_check_termination_workaround(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_check_termination_workaround):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_check_termination_workaround()
- else:
- return True
-
- def transition_Root_main_simulation_flow_check_termination_workaround_1(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_workaround()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_check_termination()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_check_termination_wait(self):
- enabled_events = self.getEnabledEvents()
- if self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_wait_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if e.name == "_1after":
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_wait_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_check_termination_wait(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_check_termination_wait):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_check_termination_wait()
- else:
- return True
-
- def transition_Root_main_simulation_flow_check_termination_wait_1(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_wait()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_check_termination()
-
- def transition_Root_main_simulation_flow_check_termination_wait_2(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_wait()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_check_termination()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_check_termination_small_step_check(self):
- enabled_events = self.getEnabledEvents()
- if self.should_terminate(False) == -2:
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_small_step_check_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- if self.should_terminate(False) == -1:
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_small_step_check_2, [])
- return True
- enabled_events = self.getEnabledEvents()
- if self.should_terminate(False) > -1:
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_small_step_check_3, [])
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_check_termination_small_step_check(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_check_termination_small_step_check):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_check_termination_small_step_check()
- else:
- return True
-
- def transition_Root_main_simulation_flow_check_termination_small_step_check_1(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow)
- self.enterDefault_Root_main_simulation_flow_do_simulation()
-
- def transition_Root_main_simulation_flow_check_termination_small_step_check_2(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_small_step_check()
- self.raiseInternalEvent(Event("termination_condition", None, []))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_check_termination()
-
- def transition_Root_main_simulation_flow_check_termination_small_step_check_3(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_small_step_check()
- breakpoint_id = self.process_breakpoints(False)
- self.big_step.outputEvent(Event("breakpoint_triggered", "reply", [breakpoint_id, self.serialize('transitioning', self.transitioning), self.serialize('outbag', self.outbag), self.serialize('inbags', self.inbags)]))
- self.raiseInternalEvent(Event("termination_condition", None, []))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_check_termination()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_check_termination_check_termination(self):
- enabled_events = self.getEnabledEvents()
- if self.inState([self.Root_main_simulation_state_continuous]) and (self.should_terminate(False) == -2):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- if self.inState([self.Root_main_simulation_state_big_step]) and (self.should_terminate(False) == -2):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_2, [])
- return True
- enabled_events = self.getEnabledEvents()
- if self.inState([self.Root_main_simulation_state_realtime]) and (self.should_terminate(True) == -2) and (self.the_time > 0.0):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_3, [])
- return True
- enabled_events = self.getEnabledEvents()
- if self.inState([self.Root_main_simulation_state_realtime]) and (self.should_terminate(True) == -2) and (self.the_time <= 0.0):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_4, [])
- return True
- enabled_events = self.getEnabledEvents()
- if (not self.inState([self.Root_main_simulation_state_paused]) and self.inState([self.Root_main_simulation_state_realtime]) and (self.should_terminate(True) == -1)):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_5, [])
- return True
- enabled_events = self.getEnabledEvents()
- if (not self.inState([self.Root_main_simulation_state_paused]) and not self.inState([self.Root_main_simulation_state_realtime]) and (self.should_terminate(False) == -1)):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_6, [])
- return True
- enabled_events = self.getEnabledEvents()
- if (not self.inState([self.Root_main_simulation_state_paused])) and self.inState([self.Root_main_simulation_state_realtime]) and (self.should_terminate(True) > -1):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_7, [])
- return True
- enabled_events = self.getEnabledEvents()
- if (not self.inState([self.Root_main_simulation_state_paused])) and not self.inState([self.Root_main_simulation_state_realtime]) and (self.should_terminate(False) > -1):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_8, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "small_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_9, e.parameters)
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "god_event") and (e.port == "request"):
- parameters = e.parameters
- configuration = parameters[0]
- if self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_check_termination_check_termination_10, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_check_termination_check_termination(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_check_termination_check_termination):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_check_termination_check_termination()
- else:
- return True
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_1(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow)
- self.enterDefault_Root_main_simulation_flow_do_simulation()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_2(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow)
- self.enterDefault_Root_main_simulation_flow_do_simulation()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_3(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_check_termination()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_wait()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_4(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow)
- self.enterDefault_Root_main_simulation_flow_do_simulation()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_5(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_check_termination()
- self.raiseInternalEvent(Event("termination_condition", None, []))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_workaround()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_6(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_check_termination()
- self.raiseInternalEvent(Event("termination_condition", None, []))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_workaround()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_7(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_check_termination()
- breakpoint_id = self.process_breakpoints(True)
- self.big_step.outputEvent(Event("breakpoint_triggered", "reply", [breakpoint_id, self.serialize('transitioning', self.transitioning), self.serialize('outbag', self.outbag), self.serialize('inbags', self.inbags)]))
- self.raiseInternalEvent(Event("termination_condition", None, []))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_workaround()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_8(self, parameters):
- self.exit_Root_main_simulation_flow_check_termination_check_termination()
- breakpoint_id = self.process_breakpoints(False)
- self.big_step.outputEvent(Event("breakpoint_triggered", "reply", [breakpoint_id, self.serialize('transitioning', self.transitioning), self.serialize('outbag', self.outbag), self.serialize('inbags', self.inbags)]))
- self.raiseInternalEvent(Event("termination_condition", None, []))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_workaround()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_9(self, parameters):
- configuration = parameters[0]
- self.exit_Root_main_simulation_flow_check_termination_check_termination()
- self.parse_options(configuration)
-
- self.listeners = {getattr(self.root_model, portname): listener for (portname, listener) in self.listeners_by_string.iteritems()}
- if not (len(self.listeners) == len(self.model.listeners)):
- self.model.listeners = self.listeners
- self.model.component_set = directConnect(self.model.models, self.listeners)
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_small_step_check()
-
- def transition_Root_main_simulation_flow_check_termination_check_termination_10(self, parameters):
- configuration = parameters[0]
- self.exit_Root_main_simulation_flow_check_termination_check_termination()
- modelname = configuration["model"]
- state_attribute = configuration["attribute"]
- new_value = configuration["value"]
- model = self.find_model_with_name(modelname)
- setattr(model.state, state_attribute, new_value)
- # Call the timeadvance method again and compute new ta
- ta = model.timeAdvance()
- model.time_next = (model.time_last[0] + ta, 1 if ta else (model.time_last[1] + 1))
- self.model.scheduler.massReschedule([model])
- # self.simulation_time = self.model.scheduler.readFirst()
- self.big_step.outputEvent(Event("god_event_ok", "reply", [{model.getModelFullName(): model.state}]))
- self.big_step.outputEvent(Event("new_tn", "reply", [self.simulation_time, {model.getModelFullName(): model.time_next}]))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_check_termination)
- self.enter_Root_main_simulation_flow_check_termination_workaround()
-
- def generateCandidatesChildren_Root_main_simulation_flow_do_simulation(self):
- if self.current_state[self.Root_main_simulation_flow_do_simulation][0] == self.Root_main_simulation_flow_do_simulation_init:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation_init()
- elif self.current_state[self.Root_main_simulation_flow_do_simulation][0] == self.Root_main_simulation_flow_do_simulation_found_internal_imminents:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation_found_internal_imminents()
- elif self.current_state[self.Root_main_simulation_flow_do_simulation][0] == self.Root_main_simulation_flow_do_simulation_selected_imminent:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation_selected_imminent()
- elif self.current_state[self.Root_main_simulation_flow_do_simulation][0] == self.Root_main_simulation_flow_do_simulation_computed_outputfunction:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation_computed_outputfunction()
- elif self.current_state[self.Root_main_simulation_flow_do_simulation][0] == self.Root_main_simulation_flow_do_simulation_routed_messages:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation_routed_messages()
- elif self.current_state[self.Root_main_simulation_flow_do_simulation][0] == self.Root_main_simulation_flow_do_simulation_found_all_imminents:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation_found_all_imminents()
- elif self.current_state[self.Root_main_simulation_flow_do_simulation][0] == self.Root_main_simulation_flow_do_simulation_computed_transitions:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation_computed_transitions()
- elif self.current_state[self.Root_main_simulation_flow_do_simulation][0] == self.Root_main_simulation_flow_do_simulation_computed_ta:
- return self.generateCandidates_Root_main_simulation_flow_do_simulation_computed_ta()
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation):
- return self.generateCandidatesChildren_Root_main_simulation_flow_do_simulation()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_init(self):
- enabled_events = self.getEnabledEvents()
- if not self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_init_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- if self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_init_2, [])
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation_init(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation_init):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_init()
- else:
- return True
-
- def transition_Root_main_simulation_flow_do_simulation_init_1(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_init()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_found_internal_imminents()
-
- def transition_Root_main_simulation_flow_do_simulation_init_2(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_init()
- self.big_step.outputEvent(Event("imminents", "reply", [self.simulation_time, self.serialize('imminents', self.imminents)]))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_found_internal_imminents()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_found_internal_imminents(self):
- enabled_events = self.getEnabledEvents()
- if not self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_found_internal_imminents_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "small_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_found_internal_imminents_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation_found_internal_imminents(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation_found_internal_imminents):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_found_internal_imminents()
- else:
- return True
-
- def transition_Root_main_simulation_flow_do_simulation_found_internal_imminents_1(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_found_internal_imminents()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_selected_imminent()
-
- def transition_Root_main_simulation_flow_do_simulation_found_internal_imminents_2(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_found_internal_imminents()
- self.big_step.outputEvent(Event("selected_imminent", "reply", [self.simulation_time, self.serialize('imminent', self.imminent)]))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_selected_imminent()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_selected_imminent(self):
- enabled_events = self.getEnabledEvents()
- if not self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_selected_imminent_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "small_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_selected_imminent_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation_selected_imminent(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation_selected_imminent):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_selected_imminent()
- else:
- return True
-
- def transition_Root_main_simulation_flow_do_simulation_selected_imminent_1(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_selected_imminent()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_computed_outputfunction()
-
- def transition_Root_main_simulation_flow_do_simulation_selected_imminent_2(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_selected_imminent()
- self.big_step.outputEvent(Event("outbag", "reply", [self.simulation_time, self.serialize('outbag', self.outbag)]))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_computed_outputfunction()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_computed_outputfunction(self):
- enabled_events = self.getEnabledEvents()
- if not self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_computed_outputfunction_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "small_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_computed_outputfunction_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation_computed_outputfunction(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation_computed_outputfunction):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_computed_outputfunction()
- else:
- return True
-
- def transition_Root_main_simulation_flow_do_simulation_computed_outputfunction_1(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_computed_outputfunction()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_routed_messages()
-
- def transition_Root_main_simulation_flow_do_simulation_computed_outputfunction_2(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_computed_outputfunction()
- self.big_step.outputEvent(Event("inbags", "reply", [self.simulation_time, self.serialize('inbags', self.inbags)]))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_routed_messages()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_routed_messages(self):
- enabled_events = self.getEnabledEvents()
- if not self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_routed_messages_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "small_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_routed_messages_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation_routed_messages(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation_routed_messages):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_routed_messages()
- else:
- return True
-
- def transition_Root_main_simulation_flow_do_simulation_routed_messages_1(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_routed_messages()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_found_all_imminents()
-
- def transition_Root_main_simulation_flow_do_simulation_routed_messages_2(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_routed_messages()
- self.big_step.outputEvent(Event("transitioning", "reply", [self.simulation_time, self.serialize('transitioning', self.transitioning)]))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_found_all_imminents()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_found_all_imminents(self):
- enabled_events = self.getEnabledEvents()
- if not self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_found_all_imminents_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "small_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_found_all_imminents_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation_found_all_imminents(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation_found_all_imminents):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_found_all_imminents()
- else:
- return True
-
- def transition_Root_main_simulation_flow_do_simulation_found_all_imminents_1(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_found_all_imminents()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_computed_transitions()
-
- def transition_Root_main_simulation_flow_do_simulation_found_all_imminents_2(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_found_all_imminents()
- self.big_step.outputEvent(Event("new_internal_states", "reply", [self.simulation_time, self.serialize('new_states', self.new_states)]))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_computed_transitions()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_computed_transitions(self):
- enabled_events = self.getEnabledEvents()
- if not self.inState([self.Root_main_simulation_state_paused]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_computed_transitions_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "small_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_computed_transitions_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation_computed_transitions(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation_computed_transitions):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_computed_transitions()
- else:
- return True
-
- def transition_Root_main_simulation_flow_do_simulation_computed_transitions_1(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_computed_transitions()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_computed_ta()
-
- def transition_Root_main_simulation_flow_do_simulation_computed_transitions_2(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation_computed_transitions()
- self.big_step.outputEvent(Event("new_tn", "reply", [self.simulation_time, self.serialize('new_tn', self.new_tn)]))
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow_do_simulation)
- self.enter_Root_main_simulation_flow_do_simulation_computed_ta()
-
- def generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_computed_ta(self):
- enabled_events = self.getEnabledEvents()
- if self.inState([self.Root_main_simulation_state_continuous]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_computed_ta_1, [])
- return True
- enabled_events = self.getEnabledEvents()
- if self.inState([self.Root_main_simulation_state_realtime]) or self.inState([self.Root_main_simulation_state_big_step]):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_computed_ta_2, [])
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "small_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_flow_do_simulation_computed_ta_3, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_flow_do_simulation_computed_ta(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_flow_do_simulation_computed_ta):
- return self.generateCandidatesCurrent_Root_main_simulation_flow_do_simulation_computed_ta()
- else:
- return True
-
- def transition_Root_main_simulation_flow_do_simulation_computed_ta_1(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation()
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow)
- self.enterDefault_Root_main_simulation_flow_check_termination()
-
- def transition_Root_main_simulation_flow_do_simulation_computed_ta_2(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation()
- self.raiseInternalEvent(Event("big_step_done", None, []))
- self.big_step.outputEvent(Event("new_states", "reply", [self.simulation_time, self.serialize('new_states', self.new_states), self.serialize('new_tn', self.new_tn), self.structural_changes]))
- self.structural_changes = {}
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow)
- self.enterDefault_Root_main_simulation_flow_check_termination()
-
- def transition_Root_main_simulation_flow_do_simulation_computed_ta_3(self, parameters):
- self.exit_Root_main_simulation_flow_do_simulation()
- self.big_step.outputEvent(Event("structural_changes", "reply", [self.simulation_time, self.structural_changes]))
- self.structural_changes = {}
- self.combo_step.setArenaChanged(self.Root_main_simulation_flow)
- self.enterDefault_Root_main_simulation_flow_check_termination()
-
- def generateCandidatesChildren_Root_main_simulation_state(self):
- if self.current_state[self.Root_main_simulation_state][0] == self.Root_main_simulation_state_paused:
- return self.generateCandidates_Root_main_simulation_state_paused()
- elif self.current_state[self.Root_main_simulation_state][0] == self.Root_main_simulation_state_continuous:
- return self.generateCandidates_Root_main_simulation_state_continuous()
- elif self.current_state[self.Root_main_simulation_state][0] == self.Root_main_simulation_state_realtime:
- return self.generateCandidates_Root_main_simulation_state_realtime()
- elif self.current_state[self.Root_main_simulation_state][0] == self.Root_main_simulation_state_big_step:
- return self.generateCandidates_Root_main_simulation_state_big_step()
- return False
-
- def generateCandidates_Root_main_simulation_state(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_state):
- return self.generateCandidatesChildren_Root_main_simulation_state()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_simulation_state_paused(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "simulate") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_paused_1, e.parameters)
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "realtime") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_paused_2, e.parameters)
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "big_step") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_paused_3, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_state_paused(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_state_paused):
- return self.generateCandidatesCurrent_Root_main_simulation_state_paused()
- else:
- return True
-
- def transition_Root_main_simulation_state_paused_1(self, parameters):
- configuration = parameters[0]
- self.exit_Root_main_simulation_state_paused()
- self.parse_options(configuration)
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_continuous()
-
- def transition_Root_main_simulation_state_paused_2(self, parameters):
- configuration = parameters[0]
- self.exit_Root_main_simulation_state_paused()
- self.parse_options(configuration)
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_realtime()
-
- def transition_Root_main_simulation_state_paused_3(self, parameters):
- configuration = parameters[0]
- self.exit_Root_main_simulation_state_paused()
- self.parse_options(configuration)
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_big_step()
-
- def generateCandidatesCurrent_Root_main_simulation_state_continuous(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "pause") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_continuous_1, e.parameters)
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if e.name == "termination_condition":
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_continuous_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_state_continuous(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_state_continuous):
- return self.generateCandidatesCurrent_Root_main_simulation_state_continuous()
- else:
- return True
-
- def transition_Root_main_simulation_state_continuous_1(self, parameters):
- self.exit_Root_main_simulation_state_continuous()
- # Just override termination condition
- self.termination_condition = lambda i, j, k : True
- self.termination_time = None
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_continuous()
-
- def transition_Root_main_simulation_state_continuous_2(self, parameters):
- self.exit_Root_main_simulation_state_continuous()
- self.big_step.outputEvent(Event("terminate", "reply", [self.simulation_time]))
- self.flush_file()
- self.big_step.outputEvent(Event("all_states", "reply", [self.simulation_time, {m.getModelFullName(): (m.time_next, m.state) for m in self.model.component_set}, self.structural_changes]))
- self.structural_changes = {}
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_paused()
-
- def generateCandidatesCurrent_Root_main_simulation_state_realtime(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "pause") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_realtime_1, e.parameters)
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if e.name == "termination_condition":
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_realtime_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_state_realtime(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_state_realtime):
- return self.generateCandidatesCurrent_Root_main_simulation_state_realtime()
- else:
- return True
-
- def transition_Root_main_simulation_state_realtime_1(self, parameters):
- self.exit_Root_main_simulation_state_realtime()
- # Just override termination condition
- self.termination_condition = lambda i, j, k : True
- self.termination_time = None
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_realtime()
-
- def transition_Root_main_simulation_state_realtime_2(self, parameters):
- self.exit_Root_main_simulation_state_realtime()
- self.big_step.outputEvent(Event("terminate", "reply", [self.simulation_time]))
- self.flush_file()
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_paused()
-
- def generateCandidatesCurrent_Root_main_simulation_state_big_step(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if e.name == "big_step_done":
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_big_step_1, e.parameters)
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if e.name == "termination_condition":
- self.small_step.addCandidate(self.transition_Root_main_simulation_state_big_step_2, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_simulation_state_big_step(self):
- if not self.combo_step.isArenaChanged(self.Root_main_simulation_state_big_step):
- return self.generateCandidatesCurrent_Root_main_simulation_state_big_step()
- else:
- return True
-
- def transition_Root_main_simulation_state_big_step_1(self, parameters):
- self.exit_Root_main_simulation_state_big_step()
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_paused()
-
- def transition_Root_main_simulation_state_big_step_2(self, parameters):
- self.exit_Root_main_simulation_state_big_step()
- self.big_step.outputEvent(Event("terminate", "reply", [self.simulation_time]))
- self.flush_file()
- self.combo_step.setArenaChanged(self.Root_main_simulation_state)
- self.enter_Root_main_simulation_state_paused()
-
- def generateCandidatesChildren_Root_main_breakpoint_manager(self):
- if self.current_state[self.Root_main_breakpoint_manager][0] == self.Root_main_breakpoint_manager_breakpoint_manage:
- return self.generateCandidates_Root_main_breakpoint_manager_breakpoint_manage()
- return False
-
- def generateCandidates_Root_main_breakpoint_manager(self):
- if not self.combo_step.isArenaChanged(self.Root_main_breakpoint_manager):
- return self.generateCandidatesChildren_Root_main_breakpoint_manager()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_breakpoint_manager_breakpoint_manage(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "add_breakpoint") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_breakpoint_manager_breakpoint_manage_1, e.parameters)
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "del_breakpoint") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_breakpoint_manager_breakpoint_manage_2, e.parameters)
- return True
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "toggle_breakpoint") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_breakpoint_manager_breakpoint_manage_3, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_breakpoint_manager_breakpoint_manage(self):
- if not self.combo_step.isArenaChanged(self.Root_main_breakpoint_manager_breakpoint_manage):
- return self.generateCandidatesCurrent_Root_main_breakpoint_manager_breakpoint_manage()
- else:
- return True
-
- def transition_Root_main_breakpoint_manager_breakpoint_manage_1(self, parameters):
- breakpoint_id = parameters[0]
- function = parameters[1]
- enabled = parameters[2]
- disable_on_trigger = parameters[3]
- self.exit_Root_main_breakpoint_manager_breakpoint_manage()
- # TODO: report to the user if a breakpoint is invalid (trying to include in scope and reporting any exceptions).
- self.breakpoints.append(Breakpoint(breakpoint_id, function, enabled, disable_on_trigger))
- self.combo_step.setArenaChanged(self.Root_main_breakpoint_manager)
- self.enter_Root_main_breakpoint_manager_breakpoint_manage()
-
- def transition_Root_main_breakpoint_manager_breakpoint_manage_2(self, parameters):
- del_breakpoint_id = parameters[0]
- self.exit_Root_main_breakpoint_manager_breakpoint_manage()
- self.breakpoints = [breakpoint for breakpoint in self.breakpoints if breakpoint.id != del_breakpoint_id]
- self.combo_step.setArenaChanged(self.Root_main_breakpoint_manager)
- self.enter_Root_main_breakpoint_manager_breakpoint_manage()
-
- def transition_Root_main_breakpoint_manager_breakpoint_manage_3(self, parameters):
- breakpoint_id = parameters[0]
- enabled = parameters[1]
- self.exit_Root_main_breakpoint_manager_breakpoint_manage()
- for breakpoint in self.breakpoints:
- if breakpoint.id == breakpoint_id:
- breakpoint.enabled = enabled
- break
- self.combo_step.setArenaChanged(self.Root_main_breakpoint_manager)
- self.enter_Root_main_breakpoint_manager_breakpoint_manage()
-
- def generateCandidatesChildren_Root_main_listeners(self):
- if self.current_state[self.Root_main_listeners][0] == self.Root_main_listeners_listening:
- return self.generateCandidates_Root_main_listeners_listening()
- return False
-
- def generateCandidates_Root_main_listeners(self):
- if not self.combo_step.isArenaChanged(self.Root_main_listeners):
- return self.generateCandidatesChildren_Root_main_listeners()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_listeners_listening(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if e.name == "set_listen_ports":
- self.small_step.addCandidate(self.transition_Root_main_listeners_listening_1, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_listeners_listening(self):
- if not self.combo_step.isArenaChanged(self.Root_main_listeners_listening):
- return self.generateCandidatesCurrent_Root_main_listeners_listening()
- else:
- return True
-
- def transition_Root_main_listeners_listening_1(self, parameters):
- port = parameters[0]
- function = parameters[1]
- self.exit_Root_main_listeners_listening()
- self.listeners_by_string[port] = function
- self.combo_step.setArenaChanged(self.Root_main_listeners)
- self.enter_Root_main_listeners_listening()
-
- def generateCandidatesChildren_Root_main_reset(self):
- if self.current_state[self.Root_main_reset][0] == self.Root_main_reset_reset:
- return self.generateCandidates_Root_main_reset_reset()
- return False
-
- def generateCandidates_Root_main_reset(self):
- if not self.combo_step.isArenaChanged(self.Root_main_reset):
- return self.generateCandidatesChildren_Root_main_reset()
- else:
- return True
-
- def generateCandidatesCurrent_Root_main_reset_reset(self):
- enabled_events = self.getEnabledEvents()
- for e in enabled_events:
- if (e.name == "reset") and (e.port == "request"):
- self.small_step.addCandidate(self.transition_Root_main_reset_reset_1, e.parameters)
- return True
- return False
-
- def generateCandidates_Root_main_reset_reset(self):
- if not self.combo_step.isArenaChanged(self.Root_main_reset_reset):
- return self.generateCandidatesCurrent_Root_main_reset_reset()
- else:
- return True
-
- def transition_Root_main_reset_reset_1(self, parameters):
- self.exit_Root_main()
- self.model = pickle.loads(self.save_model)
- self.initialize_simulation()
- self.combo_step.setArenaChanged(self.Root)
- self.enter_Root_initializing()
-
- # Generate transition candidates for current small step
- def generateCandidates(self):
- self.generateCandidates_Root()
- class ObjectManager(ObjectManagerBase):
- def __init__(self, controller):
- ObjectManagerBase.__init__(self, controller)
-
- def instantiate(self, class_name, construct_params):
- if class_name == "SCCDSimulator":
- instance = SCCDSimulator(self.controller, construct_params[0])
- instance.associations = {}
- return instance
- class Controller(ThreadsControllerBase):
- def __init__(self, model, keep_running = None):
- if keep_running == None: keep_running = True
- ThreadsControllerBase.__init__(self, ObjectManager(self), keep_running)
- self.addInputPort("request")
- self.addOutputPort("reply")
- self.object_manager.createInstance("SCCDSimulator", [model])
|