123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- from typing import *
- from sccd.statechart.static.statechart import *
- from sccd.statechart.dynamic.event import *
- from sccd.util.debug import print_debug
- from sccd.util.bitmap import *
- from sccd.action_lang.static.scope import *
- from sccd.common.exceptions import *
- from sccd.util import timer
- # Set of current states etc.
- class StatechartExecution:
- __slots__ = ["instance", "statechart", "gc_memory", "rhs_memory", "raise_internal", "raise_output", "configuration", "history_values", "timer_ids", "schedule_callback", "cancel_callback"]
- def __init__(self, instance, statechart: Statechart):
- self.instance = instance
- self.statechart = statechart
- self.gc_memory = None
- self.rhs_memory = None
- self.raise_internal = None
- self.raise_output = None
- # set of current states
- self.configuration: Bitmap = Bitmap()
- # Mapping from history_id to set of states to enter if history is target of transition
- self.history_values: List[Bitmap] = list(statechart.tree.initial_history_values)
- # Scheduled IDs for after triggers
- self.timer_ids = [None] * statechart.tree.timer_count
- # enter default states
- def initialize(self):
- self.configuration = self.statechart.tree.initial_states
- if self.statechart.datamodel is not None:
- self.statechart.datamodel.exec(self.rhs_memory)
- ctx = EvalContext(memory=self.rhs_memory, execution=self, params=[])
- for state in self.statechart.tree.bitmap_to_states(self.configuration):
- print_debug(termcolor.colored(' ENTER %s'%state.full_name, 'green'))
- _perform_actions(ctx, state.enter)
- self._start_timers(state.after_triggers)
- self.rhs_memory.flush_transition()
- self.rhs_memory.flush_round()
- self.gc_memory.flush_round()
- # events: list SORTED by event id
- def fire_transition(self, events: List[InternalEvent], t: Transition):
- try:
- with timer.Context("transition"):
- # Sequence of exit states is the intersection between set of current states and the arena's descendants.
- with timer.Context("exit set"):
- exit_ids = self.configuration & t.exit_mask
- exit_set = self.statechart.tree.bitmap_to_states_reverse(exit_ids)
- with timer.Context("enter set"):
- # Sequence of enter states is more complex but has for a large part already been computed statically.
- enter_ids = t.enter_states_static
- if t.target_history_id is not None:
- enter_ids |= self.history_values[t.target_history_id]
- enter_set = self.statechart.tree.bitmap_to_states(enter_ids)
- ctx = EvalContext(memory=self.rhs_memory, execution=self, params=get_event_params(events, t.trigger))
- print_debug("fire " + str(t))
- with timer.Context("exit states"):
- just_exited = None
- for s in exit_set:
- print_debug(termcolor.colored(' EXIT %s' % s.full_name, 'green'))
- if s.deep_history is not None:
- # s has a deep-history child:
- history_id, history_mask, _ = s.deep_history
- self.history_values[history_id] = exit_ids & history_mask
- if s.shallow_history is not None:
- history_id, _ = s.shallow_history
- self.history_values[history_id] = just_exited.effective_targets
- self._cancel_timers(s.after_triggers)
- _perform_actions(ctx, s.exit)
- self.configuration &= ~s.state_id_bitmap
- just_exited = s
- # execute transition action(s)
- with timer.Context("actions"):
- _perform_actions(ctx, t.actions)
- with timer.Context("enter states"):
- for s in enter_set:
- print_debug(termcolor.colored(' ENTER %s' % s.full_name, 'green'))
- self.configuration |= s.state_id_bitmap
- _perform_actions(ctx, s.enter)
- self._start_timers(s.after_triggers)
- self.rhs_memory.flush_transition()
- # input(">")
- except Exception as e:
- e.args = ("During execution of transition %s:\n" % str(t) +str(e),)
- raise
- def check_guard(self, t: Transition, events: List[InternalEvent]) -> bool:
- try:
- if t.guard is None:
- return True
- else:
- result = t.guard.eval(self.gc_memory)(self.gc_memory, self.configuration, *get_event_params(events, t.trigger))
- return result
- except Exception as e:
- e.args = ("While checking guard of transition %s:\n" % str(t) + str(e),)
- raise
- def _start_timers(self, triggers: List[AfterTrigger]):
- for after in triggers:
- delay: Duration = after.delay.eval(
- EvalContext(memory=self.gc_memory, execution=self, params=[]))
- self.timer_ids[after.after_id] = self.schedule_callback(delay, InternalEvent(id=after.id, name=after.name, params=[]), [self.instance])
- def _cancel_timers(self, triggers: List[AfterTrigger]):
- for after in triggers:
- if self.timer_ids[after.after_id] is not None:
- self.cancel_callback(self.timer_ids[after.after_id])
- self.timer_ids[after.after_id] = None
- # Return whether the current configuration includes ALL the states given.
- def in_state(self, state_strings: List[str]) -> bool:
- try:
- state_ids_bitmap = bm_union(self.statechart.tree.state_dict[s].state_id_bitmap for s in state_strings)
- except KeyError as e:
- raise ModelRuntimeError("INSTATE argument %s: invalid state" % str(e)) from e
- in_state = bm_has_all(self.configuration, state_ids_bitmap)
- # if in_state:
- # print_debug("in state"+str(state_strings))
- # else:
- # print_debug("not in state"+str(state_strings))
- return in_state
- def _perform_actions(ctx: EvalContext, actions: List[Action]):
- for a in actions:
- a.exec(ctx)
- def get_event_params(events: List[InternalEvent], trigger) -> List[any]:
- # Both 'events' and 'self.enabling' are sorted by event ID,
- # this way we have to iterate over each of both lists at most once.
- iterator = iter(trigger.enabling)
- params = []
- try:
- event_decl = next(iterator)
- for e in events:
- if e.id < event_decl.id:
- continue
- else:
- while e.id > event_decl.id:
- event_decl = next(iterator)
- for p in e.params:
- params.append(p)
- except StopIteration:
- pass
- return params
|