tree.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. import termcolor
  2. from typing import *
  3. import itertools
  4. from sccd.statechart.static.action import *
  5. from sccd.util.bitmap import *
  6. from sccd.util import timer
  7. from sccd.util.visit_tree import *
  8. from sccd.util.freezable import *
  9. class State(Freezable):
  10. __slots__ = ["short_name", "parent", "stable", "children", "default_state", "transitions", "enter", "exit", "opt"]
  11. def __init__(self, short_name: str, parent: Optional['State']):
  12. super().__init__()
  13. self.short_name: str = short_name # value of 'id' attribute in XML
  14. self.parent: Optional['State'] = parent # only None if root state
  15. self.stable: bool = False # whether this is a stable stabe. this field is ignored if maximality semantics is not set to SYNTACTIC
  16. self.children: List['State'] = []
  17. self.default_state: 'State' = None # child state pointed to by 'initial' attribute
  18. self.transitions: List['Transition'] = []
  19. self.enter: List[Action] = []
  20. self.exit: List[Action] = []
  21. # Statically computed stuff from tree structure:
  22. self.opt: Optional['StateStatic'] = None
  23. if self.parent is not None:
  24. self.parent.children.append(self)
  25. # Subset of descendants that are always entered when this state is the target of a transition, minus history states. Recursive implementation, so better to use self.opt.effective_targets, which is computed statically
  26. def effective_targets(self) -> List['State']:
  27. if self.default_state:
  28. # or-state: this state + recursion on 'default state'
  29. return [self] + self.default_state.effective_targets()
  30. else:
  31. # basic state
  32. return [self]
  33. # States that are always entered when this state is part of the "enter path", but not the actual target of a transition.
  34. def additional_effective_targets(self, exclude: 'State') -> List['State']:
  35. return [self]
  36. def __repr__(self):
  37. return "State(\"%s\")" % (self.short_name)
  38. class HistoryState(State):
  39. __slots__ = ["history_id"]
  40. def __init__(self, short_name: str, parent: Optional['State']):
  41. super().__init__(short_name, parent)
  42. def effective_targets(self) -> List['State']:
  43. return []
  44. def additional_effective_targets(self, exclude: 'State') -> List['State']:
  45. assert False # history state cannot have children and therefore should never occur in a "enter path"
  46. class ShallowHistoryState(HistoryState):
  47. def __repr__(self):
  48. return "ShallowHistoryState(\"%s\")" % (self.short_name)
  49. class DeepHistoryState(HistoryState):
  50. def __repr__(self):
  51. return "DeepHistoryState(\"%s\")" % (self.short_name)
  52. class ParallelState(State):
  53. def effective_targets(self) -> List['State']:
  54. # this state + recursive on all children that are not a history state
  55. return self.additional_effective_targets(exclude=None)
  56. def additional_effective_targets(self, exclude: 'State') -> List['State']:
  57. # this state + all effective targets of children, except for 'excluded state' and history states
  58. return [self] + list(itertools.chain(*(c.effective_targets() for c in self.children if not isinstance(c, HistoryState) and c is not exclude)))
  59. def __repr__(self):
  60. return "ParallelState(\"%s\")" % (self.short_name)
  61. @dataclass
  62. class EventDecl:
  63. __slots__ = ["id", "name", "params_decl"]
  64. id: int
  65. name: str
  66. params_decl: List[ParamDecl]
  67. def render(self) -> str:
  68. if self.params_decl:
  69. return self.name + '(' + ', '.join(p.render() for p in self.params_decl) + ')'
  70. else:
  71. return self.name
  72. @dataclass
  73. class Trigger:
  74. __slots__ = ["enabling", "enabling_bitmap"]
  75. enabling: List[EventDecl]
  76. def __post_init__(self):
  77. # Optimization: Require 'enabling' to be sorted!
  78. assert sorted(self.enabling, key=lambda e: e.id) == self.enabling
  79. self.enabling_bitmap = bm_from_list(e.id for e in self.enabling)
  80. def check(self, events_bitmap: Bitmap) -> bool:
  81. return (self.enabling_bitmap & events_bitmap) == self.enabling_bitmap
  82. def render(self) -> str:
  83. return ' ∧ '.join(e.render() for e in self.enabling)
  84. def copy_params_to_stack(self, events: List[InternalEvent], memory: MemoryInterface):
  85. # Both 'events' and 'self.enabling' are sorted by event ID,
  86. # this way we have to iterate over each of both lists at most once.
  87. iterator = iter(self.enabling)
  88. try:
  89. event_decl = next(iterator)
  90. offset = 0
  91. for e in events:
  92. if e.id < event_decl.id:
  93. continue
  94. else:
  95. while e.id > event_decl.id:
  96. event_decl = next(iterator)
  97. for p in e.params:
  98. memory.store(offset, p)
  99. offset += 1
  100. except StopIteration:
  101. pass
  102. @dataclass
  103. class NegatedTrigger(Trigger):
  104. __slots__ = ["disabling", "disabling_bitmap"]
  105. disabling: List[EventDecl]
  106. def __post_init__(self):
  107. Trigger.__post_init__(self)
  108. self.disabling_bitmap = bm_from_list(e.id for e in self.disabling)
  109. def check(self, events_bitmap: Bitmap) -> bool:
  110. return Trigger.check(self, events_bitmap) and not (self.disabling_bitmap & events_bitmap)
  111. def render(self) -> str:
  112. return ' ∧ '.join(itertools.chain((e.render() for e in self.enabling), ('¬'+e.render() for e in self.disabling)))
  113. class AfterTrigger(Trigger):
  114. def __init__(self, id: int, name: str, after_id: int, delay: Expression):
  115. enabling = [EventDecl(id=id, name=name, params_decl=[])]
  116. super().__init__(enabling)
  117. self.id = id
  118. self.name = name
  119. self.after_id = after_id # unique ID for AfterTrigger
  120. self.delay = delay
  121. def render(self) -> str:
  122. return "after("+self.delay.render()+")"
  123. # Override.
  124. # An 'after'-event also has 1 parameter, but it is not accessible to the user,
  125. # hence the override.
  126. def copy_params_to_stack(self, events: List[InternalEvent], memory: MemoryInterface):
  127. pass
  128. _empty_trigger = Trigger(enabling=[])
  129. class Transition(Freezable):
  130. __slots__ = ["source", "target", "scope", "target_string", "guard", "actions", "trigger", "opt"]
  131. def __init__(self, source: State, target: State, scope: Scope, target_string: Optional[str] = None):
  132. super().__init__()
  133. self.source: State = source
  134. self.target: State = target
  135. self.scope: Scope = scope
  136. self.target_string: Optional[str] = target_string
  137. self.guard: Optional[Expression] = None
  138. self.actions: List[Action] = []
  139. self.trigger: Trigger = _empty_trigger
  140. # Statically computed stuff from tree structure:
  141. self.opt: Optional['TransitionStatic'] = None
  142. def __str__(self):
  143. return termcolor.colored("%s 🡪 %s" % (self.source.opt.full_name, self.target.opt.full_name), 'green')
  144. __repr__ = __str__
  145. # Simply a collection of read-only fields, generated during "optimization" for each state, inferred from the model, i.e. the hierarchy of states and transitions
  146. class StateStatic(Freezable):
  147. __slots__ = ["full_name", "depth", "state_id", "state_id_bitmap", "ancestors", "descendants", "effective_targets", "deep_history", "shallow_history", "after_triggers"]
  148. def __init__(self):
  149. super().__init__()
  150. self.full_name: str = ""
  151. self.depth: int -1 # Root is 0, root's children are 1, and so on
  152. self.state_id: int = -1
  153. self.state_id_bitmap: Bitmap = Bitmap() # bitmap with only state_id-bit set
  154. self.ancestors: Bitmap = Bitmap()
  155. self.descendants: Bitmap = Bitmap()
  156. self.effective_targets: Bitmap = Bitmap()
  157. # If a direct child of this state is a deep history state, then "deep history" needs to be recorded when exiting this state. This value contains a tuple, with the (history-id, history_mask) of that child state.
  158. self.deep_history: Optional[Tuple[int, Bitmap]] = None
  159. # If a direct child of this state is a shallow history state, then "shallow history" needs to be recorded when exiting this state. This value is the history-id of that child state
  160. self.shallow_history: Optional[int] = None
  161. # Triggers of outgoing transitions that are AfterTrigger.
  162. self.after_triggers: List[AfterTrigger] = []
  163. # Data that is generated for each transition.
  164. class TransitionStatic(Freezable):
  165. __slots__ = ["exit_mask", "arena_bitmap", "enter_states_static", "target_history_id", "target_stable", "raised_events"]
  166. def __init__(self, exit_mask: Bitmap, arena_bitmap: Bitmap, enter_states_static: Bitmap, target_history_id: Optional[int], target_stable: bool, raised_events: Bitmap):
  167. super().__init__()
  168. self.exit_mask: State = exit_mask
  169. self.arena_bitmap: Bitmap = arena_bitmap
  170. # The "enter set" can be computed partially statically, or entirely statically if there are no history states in it.
  171. self.enter_states_static: Bitmap = enter_states_static
  172. self.target_history_id: Optional[int] = target_history_id # History ID if target of transition is a history state, otherwise None.
  173. self.target_stable: bool = target_stable # Whether target state is a stable state
  174. self.raised_events: Bitmap = raised_events # (internal) event IDs raised by this transition
  175. self.freeze()
  176. class StateTree(Freezable):
  177. __slots__ = ["root", "transition_list", "state_list", "state_dict", "timer_count", "initial_history_values", "initial_states"]
  178. def __init__(self, root: State):
  179. super().__init__()
  180. self.root: State = root
  181. self.state_dict = {}
  182. self.state_list = []
  183. self.transition_list = []
  184. self.timer_count = 0 # number of after-transitions in the statechart
  185. self.initial_history_values = []
  186. with timer.Context("optimize tree"):
  187. def assign_state_id():
  188. next_id = 0
  189. def f(state: State, _=None):
  190. state.opt = StateStatic()
  191. nonlocal next_id
  192. state.opt.state_id = next_id
  193. state.opt.state_id_bitmap = bit(next_id)
  194. next_id += 1
  195. return f
  196. def assign_full_name(state: State, parent_full_name: str = ""):
  197. if state is root:
  198. full_name = '/'
  199. elif state.parent is root:
  200. full_name = '/' + state.short_name
  201. else:
  202. full_name = parent_full_name + '/' + state.short_name
  203. state.opt.full_name = full_name
  204. return full_name
  205. def assign_depth(state: State, parent_depth: int = 0):
  206. state.opt.depth = parent_depth + 1
  207. return parent_depth + 1
  208. def add_to_list(state: State ,_=None):
  209. self.state_dict[state.opt.full_name] = state
  210. self.state_list.append(state)
  211. def visit_transitions(state: State, _=None):
  212. for t in state.transitions:
  213. self.transition_list.append(t)
  214. if t.trigger and isinstance(t.trigger, AfterTrigger):
  215. state.opt.after_triggers.append(t.trigger)
  216. self.timer_count += 1
  217. def set_ancestors(state: State, ancestors=Bitmap()):
  218. state.opt.ancestors = ancestors
  219. return ancestors | state.opt.state_id_bitmap
  220. def set_descendants(state: State, children_descendants):
  221. descendants = bm_union(children_descendants)
  222. state.opt.descendants = descendants
  223. return state.opt.state_id_bitmap | descendants
  224. def calculate_effective_targets(state: State, _=None):
  225. # implementation of "effective_targets"-method is recursive (slow)
  226. # store the result, it is always the same:
  227. state.opt.effective_targets = states_to_bitmap(state.effective_targets())
  228. history_ids = {}
  229. def deal_with_history(state: State, children_history):
  230. for h in children_history:
  231. if isinstance(h, DeepHistoryState):
  232. state.opt.deep_history = (history_ids[h], h.parent.opt.descendants)
  233. elif isinstance(h, ShallowHistoryState):
  234. state.opt.shallow_history = history_ids[h]
  235. if isinstance(state, HistoryState):
  236. history_ids[state] = len(self.initial_history_values) # generate history ID
  237. self.initial_history_values.append(state.parent.opt.effective_targets)
  238. return state
  239. def freeze(state: State, _=None):
  240. state.freeze()
  241. state.opt.freeze()
  242. visit_tree(root, lambda s: s.children,
  243. parent_first=[
  244. assign_state_id(),
  245. assign_full_name,
  246. assign_depth,
  247. add_to_list,
  248. visit_transitions,
  249. set_ancestors,
  250. ],
  251. child_first=[
  252. set_descendants,
  253. calculate_effective_targets,
  254. ])
  255. self.initial_states = root.opt.effective_targets
  256. visit_tree(root, lambda s: s.children,
  257. child_first=[
  258. deal_with_history,
  259. freeze,
  260. ])
  261. for t in self.transition_list:
  262. # Arena can be computed statically. First compute Lowest-common ancestor:
  263. arena = self.lca(t.source, t.target)
  264. # Arena must be an Or-state:
  265. while isinstance(arena, (ParallelState, HistoryState)):
  266. arena = arena.parent
  267. # Exit states can be efficiently computed at runtime based on the set of current states.
  268. # Enter states are more complex but luckily, can be computed *partially* statically:
  269. # As a start, we calculate the enter path:
  270. # The enter path is the path from arena to the target state (not including the arena state itself).
  271. # Enter path is the intersection between:
  272. # 1) the transition's target and its ancestors, and
  273. # 2) the arena's descendants
  274. enter_path = (t.target.opt.state_id_bitmap | t.target.opt.ancestors) & arena.opt.descendants
  275. # All states on the enter path will be entered, but on the enter path, there may also be AND-states whose children are not on the enter path, but should also be entered.
  276. enter_path_iter = self.bitmap_to_states(enter_path)
  277. entered_state = next(enter_path_iter, None)
  278. enter_states_static = Bitmap()
  279. while entered_state is not None:
  280. next_entered_state = next(enter_path_iter, None)
  281. if next_entered_state:
  282. # an intermediate state on the path from arena to target
  283. enter_states_static |= states_to_bitmap(entered_state.additional_effective_targets(next_entered_state))
  284. else:
  285. # the actual target of the transition
  286. enter_states_static |= entered_state.opt.effective_targets
  287. entered_state = next_entered_state
  288. target_history_id = None
  289. if isinstance(t.target, HistoryState):
  290. target_history_id = history_ids[t.target]
  291. raised_events = Bitmap()
  292. for a in t.actions:
  293. if isinstance(a, RaiseInternalEvent):
  294. raised_events |= bit(a.event_id)
  295. t.opt = TransitionStatic(
  296. exit_mask=arena.opt.descendants,
  297. arena_bitmap=arena.opt.descendants | arena.opt.state_id_bitmap,
  298. enter_states_static=enter_states_static,
  299. target_history_id=target_history_id,
  300. target_stable=t.target.stable,
  301. raised_events=raised_events)
  302. t.freeze()
  303. self.freeze()
  304. def bitmap_to_states(self, bitmap: Bitmap) -> Iterator[State]:
  305. return (self.state_list[id] for id in bm_items(bitmap))
  306. def bitmap_to_states_reverse(self, bitmap: Bitmap) -> Iterator[State]:
  307. return (self.state_list[id] for id in bm_reverse_items(bitmap))
  308. def lca(self, s1: State, s2: State) -> State:
  309. # Intersection between source & target ancestors, last member in depth-first sorted state list.
  310. return self.state_list[bm_highest_bit(s1.opt.ancestors & s2.opt.ancestors)]
  311. def states_to_bitmap(states: Iterable[State]) -> Bitmap:
  312. return bm_from_list(s.opt.state_id for s in states)
  313. def is_ancestor(parent: State, child: State) -> bool:
  314. return bm_has(child.opt.ancestors, parent.opt.state_id)