tree.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. import termcolor
  2. from typing import *
  3. from sccd.statechart.static.action import *
  4. from sccd.util.bitmap import *
  5. from sccd.util import timer
  6. @dataclass
  7. class State:
  8. short_name: str # value of 'id' attribute in XML
  9. parent: Optional['State'] # only None if root state
  10. # scope: Scope # states have their own scope: variables can be declared in <onentry>, subsequently read in guard conditions and actions.
  11. stable: bool = False # whether this is a stable stabe. this field is ignored if maximality semantics is not set to SYNTACTIC
  12. children: List['State'] = field(default_factory=list)
  13. default_state: 'State' = None # child state pointed to by 'initial' attribute
  14. transitions: List['Transition'] = field(default_factory=list)
  15. enter: List[Action] = field(default_factory=list)
  16. exit: List[Action] = field(default_factory=list)
  17. gen: Optional['StateOptimization'] = None
  18. def __post_init__(self):
  19. if self.parent is not None:
  20. self.parent.children.append(self)
  21. def target_states(self, instance, end_of_path) -> Bitmap:
  22. if end_of_path:
  23. return self.gen.static_ts_bitmap | functools.reduce(lambda x,y: x|y, (s._target_states(instance) for s in self.gen.dynamic_ts), Bitmap())
  24. else:
  25. return self.gen.state_id_bitmap
  26. def _target_states(self, instance) -> Bitmap:
  27. # targets = [self]
  28. targets = self.gen.state_id_bitmap
  29. if self.default_state:
  30. targets |= self.default_state._target_states(instance)
  31. return targets
  32. def _static_ts(self) -> Tuple[List['State'], List['State']]:
  33. if self.default_state:
  34. static, dynamic = self.default_state._static_ts()
  35. return ([self] + static, dynamic)
  36. else:
  37. return ([self], [])
  38. def __repr__(self):
  39. return "State(\"%s\")" % (self.gen.full_name)
  40. # Generated fields (for optimization) of a state
  41. @dataclass(frozen=True)
  42. class StateOptimization:
  43. state_id: int
  44. state_id_bitmap: Bitmap
  45. full_name: str
  46. ancestors: List[State] # order: close to far away, i.e. first element is parent
  47. ancestors_bitmap: Bitmap
  48. descendants: List[State] # order: breadth-first
  49. descendants_bitmap: Bitmap
  50. history: List[State] # subset of children
  51. static_ts_bitmap: Bitmap # Bitmap of all descendants that are always part of the 'effective targets states'
  52. dynamic_ts: List[State] # Subset of descendants containing possible target-states
  53. has_eventless_transitions: bool
  54. after_triggers: List['AfterTrigger']
  55. class HistoryState(State):
  56. def target_states(self, instance, end_of_path) -> Bitmap:
  57. return Bitmap.from_list(s.gen.state_id_bitmap for s in self._target_states(instance))
  58. @abstractmethod
  59. def _target_states(self, instance) -> Bitmap:
  60. pass
  61. def _static_ts(self) -> Tuple[List[State], Bitmap]:
  62. return ([], [self])
  63. class ShallowHistoryState(HistoryState):
  64. def _target_states(self, instance) -> Bitmap:
  65. try:
  66. targets = Bitmap()
  67. for hv in instance.history_values[self.state_id]:
  68. targets |= hv.target_states(instance, True)
  69. return targets
  70. except KeyError:
  71. # TODO: is it correct that in this case, the parent itself is also entered? -> Joeri: Nope!
  72. return self.parent._target_states(instance)
  73. class DeepHistoryState(HistoryState):
  74. def _target_states(self, instance) -> Bitmap:
  75. try:
  76. return Bitmap.from_list(s.state_id for s in instance.history_values[self.state_id])
  77. except KeyError:
  78. # TODO: is it correct that in this case, the parent itself is also entered?
  79. return self.parent._target_states(instance)
  80. class ParallelState(State):
  81. def target_states(self, instance, end_of_path) -> Bitmap:
  82. return self.gen.static_ts_bitmap | Bitmap.from_list(s._target_states(instance) for s in self.gen.dynamic_ts)
  83. def _target_states(self, instance) -> Bitmap:
  84. targets = [self]
  85. for c in self.children:
  86. if not isinstance(c, HistoryState):
  87. targets.extend(c._target_states(instance))
  88. return targets
  89. def _static_ts(self) -> Tuple[List[State], Bitmap]:
  90. static = [self]
  91. dynamic = []
  92. for c in self.children:
  93. if not isinstance(c, HistoryState):
  94. c_static, c_dynamic = c._static_ts()
  95. static.extend(c_static)
  96. dynamic.extend(c_dynamic)
  97. return static, dynamic
  98. @dataclass
  99. class EventDecl:
  100. id: int
  101. name: str
  102. params_decl: List[ParamDecl]
  103. def render(self) -> str:
  104. if self.params_decl:
  105. return self.name + '(' + ', '.join(p.render() for p in self.params_decl) + ')'
  106. else:
  107. return self.name
  108. @dataclass
  109. class Trigger:
  110. enabling: List[EventDecl]
  111. def __post_init__(self):
  112. # Optimization: Require 'enabling' to be sorted!
  113. assert sorted(self.enabling, key=lambda e: e.id) == self.enabling
  114. self.enabling_bitmap = Bitmap.from_list(e.id for e in self.enabling)
  115. def check(self, events_bitmap: Bitmap) -> bool:
  116. return (self.enabling_bitmap & events_bitmap) == self.enabling_bitmap
  117. def render(self) -> str:
  118. return ' ∧ '.join(e.render() for e in self.enabling)
  119. def copy_params_to_stack(self, ctx: EvalContext):
  120. # Both 'ctx.events' and 'self.enabling' are sorted by event ID,
  121. # this way we have to iterate over each of both lists at most once.
  122. iterator = iter(self.enabling)
  123. try:
  124. event_decl = next(iterator)
  125. offset = 0
  126. for e in ctx.events:
  127. if e.id < event_decl.id:
  128. continue
  129. else:
  130. while e.id > event_decl.id:
  131. event_decl = next(iterator)
  132. for p in e.params:
  133. ctx.memory.store(offset, p)
  134. offset += 1
  135. except StopIteration:
  136. pass
  137. @dataclass
  138. class NegatedTrigger(Trigger):
  139. disabling: List[EventDecl]
  140. def __post_init__(self):
  141. Trigger.__post_init__(self)
  142. self.disabling_bitmap = Bitmap.from_list(e.id for e in self.disabling)
  143. def check(self, events_bitmap: Bitmap) -> bool:
  144. return Trigger.check(self, events_bitmap) and not (self.disabling_bitmap & events_bitmap)
  145. def render(self) -> str:
  146. return Trigger.render(self) + ' ∧ ' + ' ∧ '.join('¬'+e.render() for e in self.disabling)
  147. class AfterTrigger(Trigger):
  148. def __init__(self, id: int, name: str, after_id: int, delay: Expression):
  149. enabling = [EventDecl(id=id, name=name, params_decl=[])]
  150. super().__init__(enabling)
  151. self.id = id
  152. self.name = name
  153. self.after_id = after_id # unique ID for AfterTrigger
  154. self.delay = delay
  155. def render(self) -> str:
  156. return "after("+self.delay.render()+")"
  157. # An 'after'-event also has 1 parameter, but it is not accessible to the user,
  158. # hence the override.
  159. def copy_params_to_stack(self, ctx: EvalContext):
  160. pass
  161. @dataclass
  162. class Transition:
  163. source: State
  164. targets: List[State]
  165. scope: Scope
  166. target_string: Optional[str] = None
  167. guard: Optional[Expression] = None
  168. actions: List[Action] = field(default_factory=list)
  169. trigger: Optional[Trigger] = None
  170. gen: Optional['TransitionOptimization'] = None
  171. def __repr__(self):
  172. return termcolor.colored("%s 🡪 %s" % (self.source.gen.full_name, self.targets[0].gen.full_name), 'green')
  173. # Generated fields (for optimization) of a transition
  174. @dataclass(frozen=True)
  175. class TransitionOptimization:
  176. lca: State
  177. lca_bitmap: Bitmap
  178. arena: State
  179. arena_bitmap: Bitmap
  180. # @dataclass
  181. class StateTree:
  182. # root: The root state of a state,transition tree structure with with all fields filled in,
  183. # except the 'gen' fields. This function will fill in the 'gen' fields.
  184. def __init__(self, root: State):
  185. timer.start("optimize tree")
  186. self.state_dict = {} # mapping from 'full name' to State
  187. self.state_list = [] # depth-first list of states
  188. self.transition_list = [] # all transitions in the tree, sorted by source state, depth-first
  189. self.after_triggers = []
  190. self.stable_bitmap = Bitmap() # bitmap of state IDs of states that are stable. Only used for SYNTACTIC-maximality semantics.
  191. next_id = 0
  192. def init_state(state: State, parent_full_name: str, ancestors: List[State], ancestors_bitmap):
  193. nonlocal next_id
  194. state_id = next_id
  195. next_id += 1
  196. state_id_bitmap = bit(state_id)
  197. if state is root:
  198. full_name = '/'
  199. elif state.parent is root:
  200. full_name = '/' + state.short_name
  201. else:
  202. full_name = parent_full_name + '/' + state.short_name
  203. self.state_dict[full_name] = state
  204. self.state_list.append(state)
  205. descendants = []
  206. history = []
  207. has_eventless_transitions = False
  208. after_triggers = []
  209. static_ts, dynamic_ts = state._static_ts()
  210. for t in state.transitions:
  211. self.transition_list.append(t)
  212. if t.trigger is None:
  213. has_eventless_transitions = True
  214. elif isinstance(t.trigger, AfterTrigger):
  215. after_triggers.append(t.trigger)
  216. self.after_triggers.append(t.trigger)
  217. for c in state.children:
  218. init_state(c, full_name, [state] + ancestors, state_id_bitmap | ancestors_bitmap)
  219. if isinstance(c, HistoryState):
  220. history.append(c)
  221. descendants.extend(state.children)
  222. for c in state.children:
  223. descendants.extend(c.gen.descendants)
  224. descendants_bitmap = Bitmap.from_list(s.gen.state_id for s in descendants)
  225. static_ts_bitmap = Bitmap.from_list(s.gen.state_id for s in static_ts if s.gen) | state_id_bitmap
  226. state.gen = StateOptimization(
  227. state_id=state_id,
  228. state_id_bitmap=state_id_bitmap,
  229. full_name=full_name,
  230. ancestors=ancestors,
  231. ancestors_bitmap=ancestors_bitmap,
  232. descendants=descendants,
  233. descendants_bitmap=descendants_bitmap,
  234. history=history,
  235. static_ts_bitmap=static_ts_bitmap,
  236. dynamic_ts=dynamic_ts,
  237. has_eventless_transitions=has_eventless_transitions,
  238. after_triggers=after_triggers)
  239. if state.stable:
  240. self.stable_bitmap |= bit(state_id)
  241. init_state(root, "", [], Bitmap())
  242. self.root = root
  243. for t in self.transition_list:
  244. # the least-common ancestor can be computed statically
  245. if t.source in t.targets[0].gen.ancestors:
  246. lca = t.source
  247. else:
  248. lca = t.source.parent
  249. target = t.targets[0]
  250. if t.source.parent != target.parent: # external
  251. for a in t.source.gen.ancestors:
  252. if a in target.gen.ancestors:
  253. lca = a
  254. break
  255. arena = lca
  256. while isinstance(arena, ParallelState):
  257. arena = arena.parent
  258. t.gen = TransitionOptimization(
  259. lca=lca,
  260. lca_bitmap=lca.gen.descendants_bitmap | lca.gen.state_id_bitmap,
  261. arena=arena,
  262. arena_bitmap=arena.gen.descendants_bitmap | arena.gen.state_id_bitmap)
  263. timer.stop("optimize tree")