priority.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. from sccd.statechart.static.tree import StateTree, Transition, State, AndState, OrState
  2. from sccd.statechart.static import concurrency
  3. from sccd.statechart.static.semantic_configuration import *
  4. from sccd.util.graph import strongly_connected_components
  5. from typing import *
  6. from sccd.util.visit_tree import visit_tree
  7. from sccd.util.bitmap import *
  8. import collections
  9. import itertools
  10. # Pseudo-vertices reduce the amount of edges in the priority graph
  11. class PseudoVertex:
  12. pass
  13. Vertex = Union[Transition, PseudoVertex]
  14. EdgeList = List[Tuple[Vertex,Vertex]]
  15. # explicit ordering of orthogonal regions
  16. def explicit_ortho(tree: StateTree) -> EdgeList:
  17. edges: EdgeList = []
  18. # get all outgoing transitions of state or one of its descendants
  19. def get_transitions(s: State) -> List[Transition]:
  20. transitions = []
  21. def visit_state(s: State, _=None):
  22. transitions.extend(s.transitions)
  23. visit_tree(s, lambda s: s.real_children, parent_first=[visit_state])
  24. return transitions
  25. # create edges between transitions in one region to another
  26. def visit_parallel_state(s: State, _=None):
  27. if isinstance(s.type, AndState):
  28. prev = []
  29. # s.real_children are the orthogonal regions in document order
  30. for region in s.real_children:
  31. curr = get_transitions(region)
  32. if len(curr) > 0: # skip empty regions
  33. # instead of creating edges between all transitions in component 'prev' and all transitions in component 'curr' (|prev| x |curr| edges), we add a pseudo-vertex in the graph between them, so we only have to create |prev| + |curr| edges, expressing the same information.
  34. if len(prev) > 0:
  35. connector = PseudoVertex()
  36. edges.extend((t, connector) for t in prev)
  37. edges.extend((connector, t) for t in curr)
  38. prev = curr
  39. visit_tree(tree.root, lambda s: s.real_children,
  40. parent_first=[visit_parallel_state])
  41. return edges
  42. # explicit ordering of outgoing transitions of the same state
  43. def explicit_same_state(tree: StateTree) -> EdgeList:
  44. edges: EdgeList = []
  45. def visit_state(s: State, _=None):
  46. prev = None
  47. # s.transitions are s' outgoing transitions in document order
  48. for t in s.transitions:
  49. if prev is not None:
  50. edges.append((prev, t))
  51. prev = t
  52. visit_tree(tree.root, lambda s: s.real_children,
  53. parent_first=[visit_state])
  54. return edges
  55. # hierarchical Source-Parent ordering
  56. def source_parent(tree: StateTree) -> EdgeList:
  57. edges: EdgeList = []
  58. def visit_state(s: State, parent_transitions: List[Transition] = []) -> List[Transition]:
  59. if len(s.transitions) > 0: # skip states without transitions
  60. edges.extend(itertools.product(parent_transitions, s.transitions))
  61. return s.transitions
  62. return parent_transitions
  63. visit_tree(tree.root, lambda s: s.real_children, parent_first=[visit_state])
  64. return edges
  65. # hierarchical Source-Child ordering
  66. def source_child(tree: StateTree) -> EdgeList:
  67. edges: EdgeList = []
  68. def visit_state(s: State, ts: List[List[Transition]]) -> List[Transition]:
  69. children_transitions = list(itertools.chain.from_iterable(ts))
  70. if len(s.transitions) > 0: # skip states without transitions
  71. edges.extend(itertools.product(children_transitions, s.transitions))
  72. return s.transitions
  73. else:
  74. return children_transitions
  75. visit_tree(tree.root, lambda s: s.real_children, child_first=[visit_state])
  76. return edges
  77. # hierarchical Arena-Parent ordering
  78. def arena_parent(tree: StateTree) -> EdgeList:
  79. edges: EdgeList = []
  80. partitions = collections.defaultdict(list) # mapping of transition's arena depth to list of transitions
  81. for t in tree.transition_list:
  82. partitions[t.arena.depth].append(t)
  83. ordered_partitions = sorted(partitions.items(), key=lambda tup: tup[0])
  84. prev = []
  85. for depth, curr in ordered_partitions:
  86. edges.extend(itertools.product(prev, curr))
  87. prev = curr
  88. return edges
  89. # hierarchical Arena-Child ordering
  90. def arena_child(tree: StateTree) -> EdgeList:
  91. edges: EdgeList = []
  92. partitions = collections.defaultdict(list) # mapping of transition's arena depth to list of transitions
  93. for t in tree.transition_list:
  94. partitions[t.arena.depth].append(t)
  95. ordered_partitions = sorted(partitions.items(), key=lambda tup: -tup[0])
  96. prev = []
  97. for depth, curr in ordered_partitions:
  98. edges.extend(itertools.product(prev, curr))
  99. prev = curr
  100. return edges
  101. # no priority
  102. def none(tree: StateTree) -> EdgeList:
  103. return []
  104. # Get the (partial) priority ordering between transitions in the state tree, according to given semantics, as a graph
  105. def get_graph(tree: StateTree, semantics: SemanticConfiguration) -> EdgeList:
  106. hierarchical = {
  107. HierarchicalPriority.NONE: none,
  108. HierarchicalPriority.SOURCE_PARENT: source_parent,
  109. HierarchicalPriority.SOURCE_CHILD: source_child,
  110. HierarchicalPriority.ARENA_PARENT: arena_parent,
  111. HierarchicalPriority.ARENA_CHILD: arena_child,
  112. }[semantics.hierarchical_priority]
  113. same_state = {
  114. SameSourcePriority.NONE: none,
  115. SameSourcePriority.EXPLICIT: explicit_same_state,
  116. }[semantics.same_source_priority]
  117. orthogonal = {
  118. OrthogonalPriority.NONE: none,
  119. OrthogonalPriority.EXPLICIT: explicit_ortho,
  120. }[semantics.orthogonal_priority]
  121. edges = list(itertools.chain.from_iterable(p(tree) for p in [hierarchical, same_state, orthogonal]))
  122. return edges
  123. # Checks whether the 'priorities' given yield a valid ordering of transitions in the statechart.
  124. # Returns list of all transitions in statechart, ordered by priority (high -> low).
  125. def generate_total_ordering(tree: StateTree, graph: EdgeList, consistency: concurrency.SmallStepConsistency) -> List[Transition]:
  126. # "edges" is a list of pairs (t1, t2) of transitions, where t1 has higher priority than t2.
  127. edges = graph
  128. scc = strongly_connected_components(edges)
  129. if len(scc) != len(tree.transition_list):
  130. # Priority graph contains cycles
  131. for component in scc:
  132. if len(component) > 1:
  133. raise ModelStaticError("Cycle among transition priorities: " + str(component))
  134. total_ordering = []
  135. remaining_transitions = set(tree.transition_list)
  136. remaining_edges = edges
  137. while len(remaining_edges) > 0:
  138. # 1. Find set of highest-priority transitions (= the ones that have no incoming edges)
  139. # Such a set must exist, because we've already assured that are no cycles in the graph.
  140. highs = set() # all transitions that have outgoing edges ("higher priority")
  141. lows = set() # all transitions that have incoming edges ("lower priority")
  142. for high, low in remaining_edges:
  143. highs.add(high)
  144. lows.add(low)
  145. highest_priority = highs - lows
  146. # pseudo-vertices filtered from it:
  147. highest_priority_transitions = set(t for t in highest_priority if not isinstance(t, PseudoVertex))
  148. # 2. Check if the transitions in this set are allowed to have equal priority.
  149. concurrency.check_nondeterminism(tree, highest_priority_transitions, consistency) # may raise Exception
  150. # 3. All good. Add the transitions in the highest-priority set in any order to the total ordering
  151. total_ordering.extend(highest_priority_transitions)
  152. # 4. Remove the transitions of the highest-priority set from the graph, and repeat.
  153. remaining_edges = [(high,low) for high, low in remaining_edges if high not in highest_priority]
  154. remaining_transitions -= highest_priority_transitions
  155. # Finally, there may be transitions that occur in the priority graph only as vertices, e.g. in flat statecharts:
  156. concurrency.check_nondeterminism(tree, remaining_transitions, consistency)
  157. total_ordering.extend(remaining_transitions)
  158. return total_ordering
  159. def priority_and_concurrency(statechart):
  160. graph = get_graph(statechart.tree, statechart.semantics)
  161. consistency = {
  162. Concurrency.SINGLE: concurrency.NoConcurrency(),
  163. Concurrency.MANY: concurrency.ArenaOrthogonal(),
  164. }[statechart.semantics.concurrency]
  165. priority_ordered_transitions = generate_total_ordering(statechart.tree, graph, consistency)
  166. return priority_ordered_transitions