123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- from typing import *
- from sccd.statechart.static.statechart import *
- from sccd.statechart.dynamic.event import *
- from sccd.util.debug import print_debug
- from sccd.util.bitmap import *
- from sccd.action_lang.static.scope import *
- from sccd.action_lang.dynamic.exceptions import *
- from sccd.util import timer
- # Set of current states etc.
- class StatechartExecution:
- def __init__(self, statechart: Statechart, instance):
- self.statechart = statechart
- self.instance = instance
- self.gc_memory = None
- self.rhs_memory = None
- self.raise_internal = None
- self.raise_next_bs = None
- # these 2 fields have the same information
- self.configuration: List[State] = []
- self.configuration_bitmap: Bitmap = Bitmap()
- self.eventless_states = 0 # number of states in current configuration that have at least one eventless outgoing transition.
- # mapping from configuration_bitmap (=int) to configuration (=List[State])
- self.config_mem = {}
- # mapping from history state id to states to enter if history is target of transition
- self.history_values: Dict[int, List[State]] = {}
- # For each AfterTrigger in the statechart tree, we keep an expected 'id' that is
- # a parameter to a future 'after' event. This 'id' is incremented each time a timer
- # is started, so we only respond to the most recent one.
- self.timer_ids = [-1] * len(statechart.tree.after_triggers)
- # output events accumulate here until they are collected
- self.output = []
- # enter default states
- def initialize(self):
- states = self.statechart.tree.root.target_states(self, True)
- self.configuration.extend(self.statechart.tree.state_list[id] for id in states.items())
- self.configuration_bitmap = states
- ctx = EvalContext(current_state=self, events=[], memory=self.rhs_memory)
- if self.statechart.datamodel is not None:
- self.statechart.datamodel.exec(self.rhs_memory)
- for state in self.configuration:
- print_debug(termcolor.colored(' ENTER %s'%state.gen.full_name, 'green'))
- self.eventless_states += state.gen.has_eventless_transitions
- self._perform_actions(ctx, state.enter)
- self._start_timers(state.gen.after_triggers)
- self.rhs_memory.flush_transition()
- self.rhs_memory.flush_round()
- self.gc_memory.flush_round()
- # events: list SORTED by event id
- def fire_transition(self, events: List[Event], t: Transition):
- try:
- timer.start("transition")
- timer.start("exit states")
- # Exit set is the intersection between self.configuration and t.gen.arena.descendants.
- # The following was found to have better performance than reverse-iterating and filtering self.configuration or t.arena.gen.descendants lists, despite the fact that Bitmap.reverse_items() isn't very efficient.
- exit_ids = self.configuration_bitmap & t.gen.arena.gen.descendants_bitmap
- exit_set = (self.statechart.tree.state_list[id] for id in exit_ids.reverse_items())
- # Alternative implementation:
- # if len(self.configuration) < len(t.gen.arena.gen.descendants):
- # exit_set = (s for s in self.configuration if s.gen.state_id_bitmap & t.gen.arena.gen.descendants_bitmap)
- # else:
- # exit_set = (s for s in t.gen.arena.gen.descendants if s.gen.state_id_bitmap & self.configuration_bitmap)
- timer.stop("exit states")
- timer.start("enter states")
- # Enter path is the intersection between:
- # 1) the transitions target + the target's ancestors and
- # 2) the arena's descendants
- enter_path = (t.targets[0].gen.ancestors_bitmap | t.targets[0].gen.state_id_bitmap) & t.gen.arena.gen.descendants_bitmap
- # Now, along the enter path, there may be AND-states whose children we don't explicitly enter, but should enter.
- # That's why we call 'target_states' on every state on the path and join the results.
- items = enter_path.items()
- shifted = itertools.chain(enter_path.items(), [-1])
- next(shifted) # throw away first value
- pairwise = zip(items, shifted)
- enter_ids = Bitmap()
- for state_id, next_state_id in pairwise:
- enter_ids |= self.statechart.tree.state_list[state_id].target_states(self, next_state_id == -1)
- enter_set = (self.statechart.tree.state_list[id] for id in enter_ids.items())
- timer.stop("enter states")
- ctx = EvalContext(current_state=self, events=events, memory=self.rhs_memory)
- print_debug("fire " + str(t))
- timer.start("exit states")
- # exit states...
- for s in exit_set:
- # remember which state(s) we were in if a history state is present
- for h in s.gen.history:
- if isinstance(h, DeepHistoryState):
- f = lambda s0: not s0.gen.descendants_bitmap and s0.gen.state_id_bitmap & s.gen.descendants_bitmap
- else:
- f = lambda s0: s0.gen.ancestors_bitmap and s0.parent == s
- self.history_values[h.gen.state_id] = list(filter(f, self.configuration))
- print_debug(termcolor.colored(' EXIT %s' % s.gen.full_name, 'green'))
- self.eventless_states -= s.gen.has_eventless_transitions
- # execute exit action(s)
- self._perform_actions(ctx, s.exit)
- # self.rhs_memory.pop_local_scope(s.scope)
- self.configuration_bitmap &= ~s.gen.state_id_bitmap
- timer.stop("exit states")
- # execute transition action(s)
- timer.start("actions")
- self.rhs_memory.push_frame(t.scope) # make room for event parameters on stack
- if t.trigger:
- t.trigger.copy_params_to_stack(ctx)
- self._perform_actions(ctx, t.actions)
- self.rhs_memory.pop_frame()
- timer.stop("actions")
- timer.start("enter states")
- # enter states...
- for s in enter_set:
- print_debug(termcolor.colored(' ENTER %s' % s.gen.full_name, 'green'))
- self.eventless_states += s.gen.has_eventless_transitions
- self.configuration_bitmap |= s.gen.state_id_bitmap
- # execute enter action(s)
- self._perform_actions(ctx, s.enter)
- self._start_timers(s.gen.after_triggers)
- timer.stop("enter states")
-
- try:
- self.configuration = self.config_mem[self.configuration_bitmap]
- except KeyError:
- self.configuration = self.config_mem[self.configuration_bitmap] = [s for s in self.statechart.tree.state_list if self.configuration_bitmap & s.gen.state_id_bitmap]
- self.rhs_memory.flush_transition()
- timer.stop("transition")
- except SCCDRuntimeException as e:
- e.args = ("During execution of transition %s:\n" % str(t) +str(e),)
- raise
- def check_guard(self, t, events) -> bool:
- try:
- # Special case: after trigger
- if isinstance(t.trigger, AfterTrigger):
- e = [e for e in events if bit(e.id) & t.trigger.enabling_bitmap][0] # it's safe to assume the list will contain one element cause we only check a transition's guard after we know it may be enabled given the set of events
- if self.timer_ids[t.trigger.after_id] != e.params[0]:
- return False
- if t.guard is None:
- return True
- else:
- ctx = EvalContext(current_state=self, events=events, memory=self.gc_memory)
- self.gc_memory.push_frame(t.scope)
- # Guard conditions can also refer to event parameters
- if t.trigger:
- t.trigger.copy_params_to_stack(ctx)
- result = t.guard.eval(self.gc_memory)
- self.gc_memory.pop_frame()
- return result
- except SCCDRuntimeException as e:
- e.args = ("While checking guard of transition %s:\n" % str(t) +str(e),)
- raise
- def check_source(self, t) -> bool:
- return self.configuration_bitmap & t.source.gen.state_id_bitmap
- @staticmethod
- def _perform_actions(ctx: EvalContext, actions: List[Action]):
- for a in actions:
- a.exec(ctx)
- def _start_timers(self, triggers: List[AfterTrigger]):
- for after in triggers:
- delay: Duration = after.delay.eval(
- EvalContext(current_state=self, events=[], memory=self.gc_memory))
- timer_id = self._next_timer_id(after)
- self.raise_next_bs(Event(id=after.id, name=after.name, params=[timer_id]), delay)
- def _next_timer_id(self, trigger: AfterTrigger):
- self.timer_ids[trigger.after_id] += 1
- return self.timer_ids[trigger.after_id]
- # Return whether the current configuration includes ALL the states given.
- def in_state(self, state_strings: List[str]) -> bool:
- state_ids_bitmap = Bitmap.from_list((self.statechart.tree.state_dict[state_string].gen.state_id for state_string in state_strings))
- in_state = self.configuration_bitmap.has_all(state_ids_bitmap)
- # if in_state:
- # print_debug("in state"+str(state_strings))
- # else:
- # print_debug("not in state"+str(state_strings))
- return in_state
- def collect_output(self) -> List[OutputEvent]:
- output = self.output
- self.output = []
- return output
|