statechart_execution.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. from typing import *
  2. from sccd.statechart.static.statechart import *
  3. from sccd.statechart.dynamic.event import *
  4. from sccd.util.debug import print_debug
  5. from sccd.util.bitmap import *
  6. from sccd.action_lang.static.scope import *
  7. from sccd.common.exceptions import *
  8. from sccd.util import timer
  9. # Set of current states etc.
  10. class StatechartExecution:
  11. __slots__ = ["instance", "statechart", "gc_memory", "rhs_memory", "raise_internal", "raise_output", "configuration", "history_values", "timer_ids", "schedule_callback", "cancel_callback"]
  12. def __init__(self, instance, statechart: Statechart):
  13. self.instance = instance
  14. self.statechart = statechart
  15. self.gc_memory = None
  16. self.rhs_memory = None
  17. self.raise_internal = None
  18. self.raise_output = None
  19. # set of current states
  20. self.configuration: Bitmap = Bitmap()
  21. # Mapping from history_id to set of states to enter if history is target of transition
  22. self.history_values: List[Bitmap] = list(statechart.tree.initial_history_values)
  23. # Scheduled IDs for after triggers
  24. self.timer_ids = [None] * statechart.tree.timer_count
  25. # enter default states
  26. def initialize(self):
  27. self.configuration = self.statechart.tree.initial_states
  28. if self.statechart.datamodel is not None:
  29. self.statechart.datamodel.exec(self.rhs_memory)
  30. ctx = EvalContext(memory=self.rhs_memory, execution=self, params=[])
  31. for state in self.statechart.tree.bitmap_to_states(self.configuration):
  32. print_debug(termcolor.colored(' ENTER %s'%state.full_name, 'green'))
  33. _perform_actions(ctx, state.enter)
  34. self._start_timers(state.after_triggers)
  35. self.rhs_memory.flush_transition()
  36. self.rhs_memory.flush_round()
  37. self.gc_memory.flush_round()
  38. # events: list SORTED by event id
  39. def fire_transition(self, events: List[InternalEvent], t: Transition):
  40. try:
  41. with timer.Context("transition"):
  42. # Sequence of exit states is the intersection between set of current states and the arena's descendants.
  43. with timer.Context("exit set"):
  44. exit_ids = self.configuration & t.exit_mask
  45. exit_set = self.statechart.tree.bitmap_to_states_reverse(exit_ids)
  46. with timer.Context("enter set"):
  47. # Sequence of enter states is more complex but has for a large part already been computed statically.
  48. enter_ids = t.enter_states_static
  49. if t.target_history_id is not None:
  50. enter_ids |= self.history_values[t.target_history_id]
  51. enter_set = self.statechart.tree.bitmap_to_states(enter_ids)
  52. ctx = EvalContext(memory=self.rhs_memory, execution=self, params=get_event_params(events, t.trigger))
  53. print_debug("fire " + str(t))
  54. with timer.Context("exit states"):
  55. just_exited = None
  56. for s in exit_set:
  57. print_debug(termcolor.colored(' EXIT %s' % s.full_name, 'green'))
  58. if s.deep_history is not None:
  59. # s has a deep-history child:
  60. history_id, history_mask, _ = s.deep_history
  61. self.history_values[history_id] = exit_ids & history_mask
  62. if s.shallow_history is not None:
  63. history_id, _ = s.shallow_history
  64. self.history_values[history_id] = just_exited.effective_targets
  65. self._cancel_timers(s.after_triggers)
  66. _perform_actions(ctx, s.exit)
  67. self.configuration &= ~s.state_id_bitmap
  68. just_exited = s
  69. # execute transition action(s)
  70. with timer.Context("actions"):
  71. _perform_actions(ctx, t.actions)
  72. with timer.Context("enter states"):
  73. for s in enter_set:
  74. print_debug(termcolor.colored(' ENTER %s' % s.full_name, 'green'))
  75. self.configuration |= s.state_id_bitmap
  76. _perform_actions(ctx, s.enter)
  77. self._start_timers(s.after_triggers)
  78. self.rhs_memory.flush_transition()
  79. # input(">")
  80. except Exception as e:
  81. e.args = ("During execution of transition %s:\n" % str(t) +str(e),)
  82. raise
  83. def check_guard(self, t: Transition, events: List[InternalEvent]) -> bool:
  84. try:
  85. if t.guard is None:
  86. return True
  87. else:
  88. result = t.guard.eval(self.gc_memory)(self.gc_memory, self.configuration, *get_event_params(events, t.trigger))
  89. return result
  90. except Exception as e:
  91. e.args = ("While checking guard of transition %s:\n" % str(t) + str(e),)
  92. raise
  93. def _start_timers(self, triggers: List[AfterTrigger]):
  94. for after in triggers:
  95. delay: Duration = after.delay.eval(
  96. EvalContext(memory=self.gc_memory, execution=self, params=[]))
  97. self.timer_ids[after.after_id] = self.schedule_callback(delay, InternalEvent(id=after.id, name=after.name, params=[]), [self.instance])
  98. def _cancel_timers(self, triggers: List[AfterTrigger]):
  99. for after in triggers:
  100. if self.timer_ids[after.after_id] is not None:
  101. self.cancel_callback(self.timer_ids[after.after_id])
  102. self.timer_ids[after.after_id] = None
  103. # Return whether the current configuration includes ALL the states given.
  104. def in_state(self, state_strings: List[str]) -> bool:
  105. try:
  106. state_ids_bitmap = bm_union(self.statechart.tree.state_dict[s].state_id_bitmap for s in state_strings)
  107. except KeyError as e:
  108. raise ModelRuntimeError("INSTATE argument %s: invalid state" % str(e)) from e
  109. in_state = bm_has_all(self.configuration, state_ids_bitmap)
  110. # if in_state:
  111. # print_debug("in state"+str(state_strings))
  112. # else:
  113. # print_debug("not in state"+str(state_strings))
  114. return in_state
  115. def _perform_actions(ctx: EvalContext, actions: List[Action]):
  116. for a in actions:
  117. a.exec(ctx)
  118. def get_event_params(events: List[InternalEvent], trigger) -> List[any]:
  119. # Both 'events' and 'self.enabling' are sorted by event ID,
  120. # this way we have to iterate over each of both lists at most once.
  121. iterator = iter(trigger.enabling)
  122. params = []
  123. try:
  124. event_decl = next(iterator)
  125. for e in events:
  126. if e.id < event_decl.id:
  127. continue
  128. else:
  129. while e.id > event_decl.id:
  130. event_decl = next(iterator)
  131. for p in e.params:
  132. params.append(p)
  133. except StopIteration:
  134. pass
  135. return params