ソースを参照

Cleaning some stuff up

Joeri Exelmans 4 年 前
コミット
78a2d6ad8c

+ 1 - 1
src/sccd/statechart/dynamic/round.py

@@ -337,7 +337,7 @@ class SmallStep(Round):
             arena = t.opt.arena_bitmap
             self.execution.fire_transition(enabled_events, t)
             dirty_arenas |= arena
-            if t.targets[0].stable:
+            if t.opt.target_stable:
                 stable_arenas |= arena
             enabled_events = self.enabled_events()
             enabled_events.sort(key=lambda e: e.id)

+ 6 - 9
src/sccd/statechart/dynamic/statechart_execution.py

@@ -27,7 +27,7 @@ class StatechartExecution:
         self.history_values: List[Bitmap] = list(statechart.tree.initial_history_values)
 
         # Scheduled IDs for after triggers
-        self.timer_ids = [None] * len(statechart.tree.after_triggers)
+        self.timer_ids = [None] * statechart.tree.timer_count
 
     # enter default states
     def initialize(self):
@@ -37,7 +37,7 @@ class StatechartExecution:
         if self.statechart.datamodel is not None:
             self.statechart.datamodel.exec(self.rhs_memory)
 
-        for state in self._ids_to_states(bm_items(self.configuration)):
+        for state in self.statechart.tree.bitmap_to_states(self.configuration):
             print_debug(termcolor.colored('  ENTER %s'%state.opt.full_name, 'green'))
             _perform_actions(ctx, state.enter)
             self._start_timers(state.opt.after_triggers)
@@ -46,24 +46,21 @@ class StatechartExecution:
         self.rhs_memory.flush_round()
         self.gc_memory.flush_round()
 
-    def _ids_to_states(self, id_iter):
-        return (self.statechart.tree.state_list[id] for id in id_iter)
-
     # events: list SORTED by event id
     def fire_transition(self, events: List[InternalEvent], t: Transition):
         try:
             with timer.Context("transition"):
                 # Sequence of exit states is the intersection between set of current states and the arena's descendants.
                 with timer.Context("exit set"):
-                    exit_ids = self.configuration & t.opt.arena.opt.descendants
-                    exit_set = self._ids_to_states(bm_reverse_items(exit_ids))
+                    exit_ids = self.configuration & t.opt.exit_mask
+                    exit_set = self.statechart.tree.bitmap_to_states_reverse(exit_ids)
 
                 with timer.Context("enter set"):
                     # Sequence of enter states is more complex but has for a large part already been computed statically.
                     enter_ids = t.opt.enter_states_static
                     if t.opt.target_history_id is not None:
                         enter_ids |= self.history_values[t.opt.target_history_id]
-                    enter_set = self._ids_to_states(bm_items(enter_ids))
+                    enter_set = self.statechart.tree.bitmap_to_states(enter_ids)
 
                 ctx = EvalContext(execution=self, events=events, memory=self.rhs_memory)
 
@@ -139,7 +136,7 @@ class StatechartExecution:
     # Return whether the current configuration includes ALL the states given.
     def in_state(self, state_strings: List[str]) -> bool:
         try:
-            state_ids_bitmap = bm_union(self.statechart.tree.state_dict[state_string].opt.state_id_bitmap for state_string in state_strings)
+            state_ids_bitmap = bm_union(self.statechart.tree.state_dict[s].opt.state_id_bitmap for s in state_strings)
         except KeyError as e:
             raise ModelRuntimeError("INSTATE argument %s: invalid state" % str(e)) from e
         in_state = bm_has_all(self.configuration, state_ids_bitmap)

+ 1 - 1
src/sccd/statechart/parser/statechart.g

@@ -8,7 +8,7 @@
 
 // Parsing target of a transition as a sequence of nodes
 
-state_ref: path | "(" path ("," path)+ ")" 
+state_ref: path
 
 ?path: absolute_path | relative_path 
 absolute_path: _PATH_SEP _path_sequence

+ 3 - 3
src/sccd/statechart/parser/xml.py

@@ -200,7 +200,7 @@ def statechart_parser_rules(globals, path, load_external = True, parse_f = parse
 
           scope = Scope("event_params", parent=statechart.scope)
           target_string = require_attribute(el, "target")
-          transition = Transition(parent, [], scope, target_string)
+          transition = Transition(source=parent, target=None, scope=scope, target_string=target_string)
 
           have_event_attr = False
           def parse_attr_event(event):
@@ -290,11 +290,11 @@ def statechart_parser_rules(globals, path, load_external = True, parse_f = parse
             return state
 
           try:
-            transition.targets = [find_target(seq) for seq in parse_tree.children]
+            transition.target = find_target(parse_tree.children[0])
           except Exception as e:
             raise XmlErrorElement(t_el, "target=\"%s\": %s" % (transition.target_string, str(e))) from e
 
-        statechart.tree = optimize_tree(root)
+        statechart.tree = StateTree(root)
 
       return (state_child_rules(root, sibling_dict=children_dict), finish_root)
 

+ 187 - 203
src/sccd/statechart/static/tree.py

@@ -26,23 +26,24 @@ class State(Freezable):
         self.enter: List[Action] = []
         self.exit: List[Action] = []
 
-        self.opt: Optional['StateOptimization'] = None
+        # Statically computed stuff from tree structure:
+        self.opt: Optional['StateStatic'] = None
 
         if self.parent is not None:
             self.parent.children.append(self)
 
-    # Subset of descendants that are always entered when this state is the target of a transition, minus history states.
-    def _effective_targets(self) -> Bitmap:
+    # Subset of descendants that are always entered when this state is the target of a transition, minus history states. Recursive implementation, so better to use self.opt.effective_targets, which is computed statically
+    def effective_targets(self) -> List['State']:
         if self.default_state:
-            # this state + recursion on 'default state'
-            return self.opt.state_id_bitmap | self.default_state._effective_targets() 
+            # or-state: this state + recursion on 'default state'
+            return [self] + self.default_state.effective_targets()
         else:
-            # only this state
-            return self.opt.state_id_bitmap
+            # basic state
+            return [self]
 
     # States that are always entered when this state is part of the "enter path", but not the actual target of a transition.
-    def _additional_effective_targets(self, exclude: 'State') -> Tuple[Bitmap, List['HistoryState']]:
-        return self.opt.state_id_bitmap # only this state
+    def additional_effective_targets(self, exclude: 'State') -> List['State']:
+        return [self]
 
     def __repr__(self):
         return "State(\"%s\")" % (self.short_name)
@@ -53,46 +54,31 @@ class HistoryState(State):
     def __init__(self, short_name: str, parent: Optional['State']):
         super().__init__(short_name, parent)
 
-        self.history_id: Optional[int] = None
+    def effective_targets(self) -> List['State']:
+        return []
 
-    # # Set of states that may be history values.
-    # @abstractmethod
-    # def history_mask(self) -> Bitmap:
-    #     pass
-
-    def _effective_targets(self) -> Bitmap:
-        return Bitmap()
-
-    def _additional_effective_targets(self, exclude: 'State') -> Bitmap:
+    def additional_effective_targets(self, exclude: 'State') -> List['State']:
         assert False # history state cannot have children and therefore should never occur in a "enter path"
 
 class ShallowHistoryState(HistoryState):
 
-    # def history_mask(self) -> Bitmap:
-    #     # Only direct children of parent:
-    #     return bm_union(s.opt.state_id_bitmap for s in self.parent.children)
-
     def __repr__(self):
         return "ShallowHistoryState(\"%s\")" % (self.short_name)
 
 class DeepHistoryState(HistoryState):
 
-    def history_mask(self) -> Bitmap:
-        # All descendants of parent:
-        return self.parent.opt.descendants
-
     def __repr__(self):
         return "DeepHistoryState(\"%s\")" % (self.short_name)
 
 class ParallelState(State):
 
-    def _effective_targets(self) -> Bitmap:
+    def effective_targets(self) -> List['State']:
         # this state + recursive on all children that are not a history state
-        return bm_union(c._effective_targets() for c in self.children if not isinstance(c, HistoryState)) | self.opt.state_id_bitmap
+        return self.additional_effective_targets(exclude=None)
 
-    def _additional_effective_targets(self, exclude: 'State') -> Bitmap:
-        # 
-        return self._effective_targets() & ~exclude._effective_targets()
+    def additional_effective_targets(self, exclude: 'State') -> List['State']:
+        # this state + all effective targets of children, except for 'excluded state' and history states
+        return [self] + list(itertools.chain(*(c.effective_targets() for c in self.children if not isinstance(c, HistoryState) and c is not exclude)))
 
     def __repr__(self):
         return "ParallelState(\"%s\")" % (self.short_name)
@@ -186,13 +172,13 @@ class AfterTrigger(Trigger):
 _empty_trigger = Trigger(enabling=[])
 
 class Transition(Freezable):
-    __slots__ = ["source", "targets", "scope", "target_string", "guard", "actions", "trigger", "opt"]
+    __slots__ = ["source", "target", "scope", "target_string", "guard", "actions", "trigger", "opt"]
 
-    def __init__(self, source: State, targets: List[State], scope: Scope, target_string: Optional[str] = None):
+    def __init__(self, source: State, target: State, scope: Scope, target_string: Optional[str] = None):
         super().__init__()
 
         self.source: State = source
-        self.targets: List[State] = targets
+        self.target: State = target
         self.scope: Scope = scope
 
         self.target_string: Optional[str] = target_string
@@ -201,16 +187,17 @@ class Transition(Freezable):
         self.actions: List[Action] = []
         self.trigger: Trigger = _empty_trigger
 
-        self.opt: Optional['TransitionOptimization'] = None        
+        # Statically computed stuff from tree structure:
+        self.opt: Optional['TransitionStatic'] = None        
 
     def __str__(self):
-        return termcolor.colored("%s 🡪 %s" % (self.source.opt.full_name, self.targets[0].opt.full_name), 'green')
+        return termcolor.colored("%s 🡪 %s" % (self.source.opt.full_name, self.target.opt.full_name), 'green')
 
     __repr__ = __str__
 
 
 # Simply a collection of read-only fields, generated during "optimization" for each state, inferred from the model, i.e. the hierarchy of states and transitions
-class StateOptimization(Freezable):
+class StateStatic(Freezable):
     __slots__ = ["full_name", "depth", "state_id", "state_id_bitmap", "ancestors", "descendants", "effective_targets", "deep_history", "shallow_history", "after_triggers"]
     def __init__(self):
         super().__init__()
@@ -237,188 +224,185 @@ class StateOptimization(Freezable):
 
 
 # Data that is generated for each transition.
-class TransitionOptimization(Freezable):
-    __slots__ = ["arena", "arena_bitmap", "enter_states_static", "target_history_id", "raised_events"]
+class TransitionStatic(Freezable):
+    __slots__ = ["exit_mask", "arena_bitmap", "enter_states_static", "target_history_id", "target_stable", "raised_events"]
 
-    def __init__(self, arena: State, arena_bitmap: Bitmap, enter_states_static: Bitmap, target_history_id: Optional[int], raised_events: Bitmap):
+    def __init__(self, exit_mask: Bitmap, arena_bitmap: Bitmap, enter_states_static: Bitmap, target_history_id: Optional[int], target_stable: bool, raised_events: Bitmap):
         super().__init__()
-        self.arena: State = arena
+        self.exit_mask: State = exit_mask
         self.arena_bitmap: Bitmap = arena_bitmap
         # The "enter set" can be computed partially statically, or entirely statically if there are no history states in it.
         self.enter_states_static: Bitmap = enter_states_static
         self.target_history_id: Optional[int] = target_history_id # History ID if target of transition is a history state, otherwise None.
+        self.target_stable: bool = target_stable # Whether target state is a stable state
         self.raised_events: Bitmap = raised_events # (internal) event IDs raised by this transition
         self.freeze()
 
 
 class StateTree(Freezable):
-    __slots__ = ["root", "transition_list", "state_list", "state_dict", "after_triggers", "stable_bitmap", "initial_history_values", "initial_states"]
+    __slots__ = ["root", "transition_list", "state_list", "state_dict", "timer_count", "initial_history_values", "initial_states"]
 
-    def __init__(self, root: State, transition_list: List[Transition], state_list: List[State], state_dict: Dict[str, State], after_triggers: List[AfterTrigger], stable_bitmap: Bitmap, initial_history_values: List[Bitmap], initial_states: Bitmap):
+    def __init__(self, root: State):
         super().__init__()
         self.root: State = root
-        self.transition_list: List[Transition] = transition_list # depth-first document order
-        self.state_list: List[State] = state_list # depth-first document order
-        self.state_dict: Dict[str, State] = state_dict # mapping from 'full name' to State
-        self.after_triggers: List[AfterTrigger] = after_triggers # all after-triggers in the statechart
-        self.stable_bitmap: Bitmap = stable_bitmap # set of states that are syntactically marked 'stable'
-        self.initial_history_values: List[Bitmap] = initial_history_values # targets of each history state before history has been built.
-        self.initial_states: Bitmap = initial_states
-        self.freeze()
 
-def optimize_tree(root: State) -> StateTree:
-    with timer.Context("optimize tree"):
-
-        def assign_state_id():
-            next_id = 0
-            def f(state: State, _=None):
-                state.opt = StateOptimization()
-
-                nonlocal next_id
-                state.opt.state_id = next_id
-                state.opt.state_id_bitmap = bit(next_id)
-                next_id += 1
-
-            return f
-
-        def assign_full_name(state: State, parent_full_name: str = ""):
-            if state is root:
-                full_name = '/'
-            elif state.parent is root:
-                full_name = '/' + state.short_name
-            else:
-                full_name = parent_full_name + '/' + state.short_name
-            state.opt.full_name = full_name
-            return full_name
-
-        def assign_depth(state: State, parent_depth: int = 0):
-            state.opt.depth = parent_depth + 1
-            return parent_depth + 1
-
-        state_dict = {}
-        state_list = []
-        stable_bitmap = Bitmap()
-        def add_to_list(state: State ,_=None):
-            nonlocal stable_bitmap
-            state_dict[state.opt.full_name] = state
-            state_list.append(state)
-            if state.stable:
-                stable_bitmap |= state.opt.state_id_bitmap
-
-        transition_list = []
-        after_triggers = []
-        def visit_transitions(state: State, _=None):
-                for t in state.transitions:
-                    transition_list.append(t)
-                    if t.trigger and isinstance(t.trigger, AfterTrigger):
-                        state.opt.after_triggers.append(t.trigger)
-                        after_triggers.append(t.trigger)
-
-        def set_ancestors(state: State, ancestors=Bitmap()):
-            state.opt.ancestors = ancestors
-            return ancestors | state.opt.state_id_bitmap
-
-        def set_descendants(state: State, children_descendants):
-            descendants = bm_union(children_descendants)
-            state.opt.descendants = descendants
-            return state.opt.state_id_bitmap | descendants
-
-        def calculate_effective_targets(state: State, _=None):
-            # implementation of "_effective_targets"-method is recursive (slow)
-            # store the result, it is always the same:
-            state.opt.effective_targets = state._effective_targets()
-
-        initial_history_values = []
-        def deal_with_history(state: State, children_history):
-            for h in children_history:
-                if isinstance(h, DeepHistoryState):
-                    state.opt.deep_history = (h.history_id, h.history_mask())
-                elif isinstance(h, ShallowHistoryState):
-                    state.opt.shallow_history = h.history_id
-
-            if isinstance(state, HistoryState):
-                state.history_id = len(initial_history_values)
-                initial_history_values.append(state.parent._effective_targets())
-                return state
-
-        def freeze(state: State, _=None):
-            state.freeze()
-            state.opt.freeze()
-
-        visit_tree(root, lambda s: s.children,
-            before_children=[
-                assign_state_id(),
-                assign_full_name,
-                assign_depth,
-                add_to_list,
-                visit_transitions,
-                set_ancestors,
-            ],
-            after_children=[
-                set_descendants,
-                calculate_effective_targets,
-            ])
-
-        visit_tree(root, lambda s: s.children,
-            after_children=[
-                deal_with_history,
-                freeze,
-            ])
-
-        for t in transition_list:
-            # Arena can be computed statically. First compute Lowest-common ancestor:
-            # Intersection between source & target ancestors, last member in depth-first sorted state list.
-            lca_id = bm_highest_bit(t.source.opt.ancestors & t.targets[0].opt.ancestors)
-            lca = state_list[lca_id]
-            arena = lca
-            # Arena must be an Or-state:
-            while isinstance(arena, (ParallelState, HistoryState)):
-                arena = arena.parent
-
-            # Exit states can be efficiently computed at runtime based on the set of current states.
-            # Enter states are more complex but luckily, can be computed *partially* statically:
-
-            # As a start, we calculate the enter path:
-            # The enter path is the path from arena to the target state (not including the arena state itself).
-            # Enter path is the intersection between:
-            #   1) the transition's target and its ancestors, and
-            #   2) the arena's descendants
-            enter_path = (t.targets[0].opt.state_id_bitmap | t.targets[0].opt.ancestors) & arena.opt.descendants
-            # All states on the enter path will be entered, but on the enter path, there may also be AND-states whose children are not on the enter path, but should also be entered.
-            enter_path_iter = bm_items(enter_path)
-            state_id = next(enter_path_iter, None)
-            enter_states_static = Bitmap()
-            while state_id is not None:
-                state = state_list[state_id]
-                next_state_id = next(enter_path_iter, None)
-                if next_state_id:
-                    # an intermediate state on the path from arena to target
-                    next_state = state_list[next_state_id]
-                    enter_states_static |= state._additional_effective_targets(next_state)
-                else:
-                    # the actual target of the transition
-                    enter_states_static |= state.opt.effective_targets
-                state_id = next_state_id
-
-            target_history_id = None
-            if isinstance(t.targets[0], HistoryState):
-                target_history_id = t.targets[0].history_id
+        self.state_dict = {}
+        self.state_list = []
+        self.transition_list = []
+        self.timer_count = 0 # number of after-transitions in the statechart
+        self.initial_history_values = []
 
+        with timer.Context("optimize tree"):
 
-            raised_events = Bitmap()
-            for a in t.actions:
-                if isinstance(a, RaiseInternalEvent):
-                    raised_events |= bit(a.event_id)
+            def assign_state_id():
+                next_id = 0
+                def f(state: State, _=None):
+                    state.opt = StateStatic()
 
-            t.opt = TransitionOptimization(
-                arena=arena,
-                arena_bitmap=arena.opt.descendants | arena.opt.state_id_bitmap,
-                enter_states_static=enter_states_static,
-                target_history_id=target_history_id,
-                raised_events=raised_events)
+                    nonlocal next_id
+                    state.opt.state_id = next_id
+                    state.opt.state_id_bitmap = bit(next_id)
+                    next_id += 1
 
-            t.freeze()
+                return f
 
-        initial_states = root._effective_targets()
-
-        return StateTree(root, transition_list, state_list, state_dict, after_triggers, stable_bitmap, initial_history_values, initial_states)
+            def assign_full_name(state: State, parent_full_name: str = ""):
+                if state is root:
+                    full_name = '/'
+                elif state.parent is root:
+                    full_name = '/' + state.short_name
+                else:
+                    full_name = parent_full_name + '/' + state.short_name
+                state.opt.full_name = full_name
+                return full_name
+
+            def assign_depth(state: State, parent_depth: int = 0):
+                state.opt.depth = parent_depth + 1
+                return parent_depth + 1
+
+            def add_to_list(state: State ,_=None):
+                self.state_dict[state.opt.full_name] = state
+                self.state_list.append(state)
+
+            def visit_transitions(state: State, _=None):
+                    for t in state.transitions:
+                        self.transition_list.append(t)
+                        if t.trigger and isinstance(t.trigger, AfterTrigger):
+                            state.opt.after_triggers.append(t.trigger)
+                            self.timer_count += 1
+
+            def set_ancestors(state: State, ancestors=Bitmap()):
+                state.opt.ancestors = ancestors
+                return ancestors | state.opt.state_id_bitmap
+
+            def set_descendants(state: State, children_descendants):
+                descendants = bm_union(children_descendants)
+                state.opt.descendants = descendants
+                return state.opt.state_id_bitmap | descendants
+
+            def calculate_effective_targets(state: State, _=None):
+                # implementation of "effective_targets"-method is recursive (slow)
+                # store the result, it is always the same:
+                state.opt.effective_targets = states_to_bitmap(state.effective_targets())
+
+            history_ids = {}
+            def deal_with_history(state: State, children_history):
+                for h in children_history:
+                    if isinstance(h, DeepHistoryState):
+                        state.opt.deep_history = (history_ids[h], h.parent.opt.descendants)
+                    elif isinstance(h, ShallowHistoryState):
+                        state.opt.shallow_history = history_ids[h]
+
+                if isinstance(state, HistoryState):
+                    history_ids[state] = len(self.initial_history_values) # generate history ID
+                    self.initial_history_values.append(state.parent.opt.effective_targets)
+                    return state
+
+            def freeze(state: State, _=None):
+                state.freeze()
+                state.opt.freeze()
+
+            visit_tree(root, lambda s: s.children,
+                before_children=[
+                    assign_state_id(),
+                    assign_full_name,
+                    assign_depth,
+                    add_to_list,
+                    visit_transitions,
+                    set_ancestors,
+                ],
+                after_children=[
+                    set_descendants,
+                    calculate_effective_targets,
+                ])
+
+            self.initial_states = root.opt.effective_targets
+
+            visit_tree(root, lambda s: s.children,
+                after_children=[
+                    deal_with_history,
+                    freeze,
+                ])
+
+            for t in self.transition_list:
+                # Arena can be computed statically. First compute Lowest-common ancestor:
+                # Intersection between source & target ancestors, last member in depth-first sorted state list.
+                lca_id = bm_highest_bit(t.source.opt.ancestors & t.target.opt.ancestors)
+                lca = self.state_list[lca_id]
+                arena = lca
+                # Arena must be an Or-state:
+                while isinstance(arena, (ParallelState, HistoryState)):
+                    arena = arena.parent
+
+                # Exit states can be efficiently computed at runtime based on the set of current states.
+                # Enter states are more complex but luckily, can be computed *partially* statically:
+
+                # As a start, we calculate the enter path:
+                # The enter path is the path from arena to the target state (not including the arena state itself).
+                # Enter path is the intersection between:
+                #   1) the transition's target and its ancestors, and
+                #   2) the arena's descendants
+                enter_path = (t.target.opt.state_id_bitmap | t.target.opt.ancestors) & arena.opt.descendants
+                # All states on the enter path will be entered, but on the enter path, there may also be AND-states whose children are not on the enter path, but should also be entered.
+                enter_path_iter = self.bitmap_to_states(enter_path)
+                entered_state = next(enter_path_iter, None)
+                enter_states_static = Bitmap()
+                while entered_state is not None:
+                    next_entered_state = next(enter_path_iter, None)
+                    if next_entered_state:
+                        # an intermediate state on the path from arena to target
+                        enter_states_static |= states_to_bitmap(entered_state.additional_effective_targets(next_entered_state))
+                    else:
+                        # the actual target of the transition
+                        enter_states_static |= entered_state.opt.effective_targets
+                    entered_state = next_entered_state
+
+                target_history_id = None
+                if isinstance(t.target, HistoryState):
+                    target_history_id = history_ids[t.target]
+
+                raised_events = Bitmap()
+                for a in t.actions:
+                    if isinstance(a, RaiseInternalEvent):
+                        raised_events |= bit(a.event_id)
+
+                t.opt = TransitionStatic(
+                    exit_mask=arena.opt.descendants,
+                    arena_bitmap=arena.opt.descendants | arena.opt.state_id_bitmap,
+                    enter_states_static=enter_states_static,
+                    target_history_id=target_history_id,
+                    target_stable=t.target.stable,
+                    raised_events=raised_events)
+
+                t.freeze()
+
+            self.freeze()
+
+    def bitmap_to_states(self, bitmap: Bitmap) -> Iterator[State]:
+        return (self.state_list[id] for id in bm_items(bitmap))
+
+    def bitmap_to_states_reverse(self, bitmap: Bitmap) -> Iterator[State]:
+        return (self.state_list[id] for id in bm_reverse_items(bitmap))
+
+def states_to_bitmap(states: Iterable[State]) -> Bitmap:
+    return bm_from_list(s.opt.state_id for s in states)