|
@@ -30,191 +30,200 @@ def load_expression(parse_node) -> Expression:
|
|
|
return Array(elements)
|
|
|
raise ParseError("Can't handle expression type: "+parse_node.data)
|
|
|
|
|
|
-# A statechart can only be loaded within the context of a model
|
|
|
-def load_statechart(model: Model, dir, sc_node, name="", default: bool = False):
|
|
|
-
|
|
|
- def _load(sc_node) -> Statechart:
|
|
|
-
|
|
|
- def load_action(action_node) -> Optional[Action]:
|
|
|
- tag = ET.QName(action_node).localname
|
|
|
- if tag == "raise":
|
|
|
- name = action_node.get("event")
|
|
|
- port = action_node.get("port")
|
|
|
- if not port:
|
|
|
- event_id = model.event_namespace.assign_id(name)
|
|
|
- return RaiseInternalEvent(name=name, parameters=[], event_id=event_id)
|
|
|
- else:
|
|
|
- if port not in model.outports:
|
|
|
- model.outports.append(port)
|
|
|
- return RaiseOutputEvent(name=name, parameters=[], outport=port, time_offset=0)
|
|
|
- else:
|
|
|
- raise None
|
|
|
-
|
|
|
- # parent_node: XML node containing any number of action nodes as direct children
|
|
|
- def load_actions(parent_node) -> List[Action]:
|
|
|
- return list(filter(lambda x: x is not None, map(lambda child: load_action(child), parent_node)))
|
|
|
-
|
|
|
- transition_nodes: List[Tuple[Any, State]] = [] # List of (<transition>, State) tuples
|
|
|
-
|
|
|
- # Recursively create state hierarchy from XML node
|
|
|
- # Adding <transition> elements to the 'transitions' list as a side effect
|
|
|
- def load_state(state_node) -> Optional[State]:
|
|
|
- state = None
|
|
|
- name = state_node.get("id", "")
|
|
|
- tag = ET.QName(state_node).localname
|
|
|
- if tag == "state":
|
|
|
- state = State(name)
|
|
|
- elif tag == "parallel" :
|
|
|
- state = ParallelState(name)
|
|
|
- elif tag == "history":
|
|
|
- is_deep = state_node.get("type", "shallow") == "deep"
|
|
|
- if is_deep:
|
|
|
- state = DeepHistoryState(name)
|
|
|
- else:
|
|
|
- state = ShallowHistoryState(name)
|
|
|
+# Load state tree from XML <tree> node.
|
|
|
+# Namespace is required for building event namespace and in/outport discovery.
|
|
|
+def load_state_tree(namespace: ModelNamespace, tree_node) -> StateTree:
|
|
|
+ def load_action(action_node) -> Optional[Action]:
|
|
|
+ tag = ET.QName(action_node).localname
|
|
|
+ if tag == "raise":
|
|
|
+ name = action_node.get("event")
|
|
|
+ port = action_node.get("port")
|
|
|
+ if not port:
|
|
|
+ event_id = namespace.assign_event_id(name)
|
|
|
+ return RaiseInternalEvent(name=name, parameters=[], event_id=event_id)
|
|
|
else:
|
|
|
- return None
|
|
|
-
|
|
|
- initial = state_node.get("initial", "")
|
|
|
- for xml_child in state_node.getchildren():
|
|
|
- child = load_state(xml_child) # may throw
|
|
|
- if child:
|
|
|
- state.addChild(child)
|
|
|
- if child.short_name == initial:
|
|
|
- state.default_state = child
|
|
|
- if not initial and len(state.children) == 1:
|
|
|
- state.default_state = state.children[0]
|
|
|
-
|
|
|
- for xml_t in state_node.findall("transition", state_node.nsmap):
|
|
|
- transition_nodes.append((xml_t, state))
|
|
|
-
|
|
|
- # Parse enter/exit actions
|
|
|
- def _get_enter_exit(tag, setter):
|
|
|
- node = state_node.find(tag, state_node.nsmap)
|
|
|
- if node is not None:
|
|
|
- actions = load_actions(node)
|
|
|
- setter(actions)
|
|
|
-
|
|
|
- _get_enter_exit("onentry", state.setEnter)
|
|
|
- _get_enter_exit("onexit", state.setExit)
|
|
|
-
|
|
|
- return state
|
|
|
-
|
|
|
- # Build tree structure
|
|
|
- tree_node = sc_node.find("tree")
|
|
|
- root_node = tree_node.find("state")
|
|
|
- root = load_state(root_node)
|
|
|
-
|
|
|
- # Add transitions
|
|
|
- next_after_id = 0
|
|
|
- for t_node, source in transition_nodes:
|
|
|
- # Parse and find target state
|
|
|
- target_string = t_node.get("target", "")
|
|
|
- parse_tree = parser.parse(target_string, start="state_ref")
|
|
|
- def find_state(sequence) -> State:
|
|
|
- if sequence.data == "relative_path":
|
|
|
- el = source
|
|
|
- elif sequence.data == "absolute_path":
|
|
|
- el = root
|
|
|
- for item in sequence.children:
|
|
|
- if item.type == "PARENT_NODE":
|
|
|
- el = el.parent
|
|
|
- elif item.type == "CURRENT_NODE":
|
|
|
- continue
|
|
|
- elif item.type == "IDENTIFIER":
|
|
|
- el = [x for x in el.children if x.short_name == item.value][0]
|
|
|
- return el
|
|
|
- targets = [find_state(seq) for seq in parse_tree.children]
|
|
|
-
|
|
|
- transition = Transition(source, targets)
|
|
|
-
|
|
|
- # Trigger
|
|
|
- event = t_node.get("event")
|
|
|
- port = t_node.get("port")
|
|
|
- after = t_node.get("after")
|
|
|
- if after is not None:
|
|
|
- event = "_after%d" % next_after_id # transition gets unique event name
|
|
|
- next_after_id += 1
|
|
|
- trigger = AfterTrigger(event_namespace.assign_id(event), event, Timestamp(after))
|
|
|
- elif event is not None:
|
|
|
- trigger = Trigger(event_namespace.assign_id(event), event, port)
|
|
|
- if port not in model.inports:
|
|
|
- model.inports.append(port)
|
|
|
+ namespace.add_outport(port)
|
|
|
+ return RaiseOutputEvent(name=name, parameters=[], outport=port, time_offset=0)
|
|
|
+ else:
|
|
|
+ raise None
|
|
|
+
|
|
|
+ # parent_node: XML node containing any number of action nodes as direct children
|
|
|
+ def load_actions(parent_node) -> List[Action]:
|
|
|
+ return list(filter(lambda x: x is not None, map(lambda child: load_action(child), parent_node)))
|
|
|
+
|
|
|
+ transition_nodes: List[Tuple[Any, State]] = [] # List of (<transition>, State) tuples
|
|
|
+
|
|
|
+ # Recursively create state hierarchy from XML node
|
|
|
+ # Adding <transition> elements to the 'transitions' list as a side effect
|
|
|
+ def load_state(state_node) -> Optional[State]:
|
|
|
+ state = None
|
|
|
+ name = state_node.get("id", "")
|
|
|
+ tag = ET.QName(state_node).localname
|
|
|
+ if tag == "state":
|
|
|
+ state = State(name)
|
|
|
+ elif tag == "parallel" :
|
|
|
+ state = ParallelState(name)
|
|
|
+ elif tag == "history":
|
|
|
+ is_deep = state_node.get("type", "shallow") == "deep"
|
|
|
+ if is_deep:
|
|
|
+ state = DeepHistoryState(name)
|
|
|
else:
|
|
|
- trigger = None
|
|
|
- transition.setTrigger(trigger)
|
|
|
- # Actions
|
|
|
- actions = load_actions(t_node)
|
|
|
- transition.setActions(actions)
|
|
|
- # Guard
|
|
|
- cond = t_node.get("cond")
|
|
|
- if cond is not None:
|
|
|
- parse_tree = parser.parse(cond, start="expr")
|
|
|
- # print(parse_tree)
|
|
|
- # print(parse_tree.pretty())
|
|
|
- cond_expr = load_expression(parse_tree)
|
|
|
- transition.setGuard(cond_expr)
|
|
|
- source.addTransition(transition)
|
|
|
-
|
|
|
- # Calculate stuff like list of ancestors, descendants, etc.
|
|
|
- # Also get depth-first ordered lists of states and transitions (by source)
|
|
|
- states: Dict[str, State] = {}
|
|
|
- state_list: List[State] = []
|
|
|
- transition_list: List[Transition] = []
|
|
|
- root.init_tree(0, "", states, state_list, transition_list)
|
|
|
-
|
|
|
- for t in transition_list:
|
|
|
- t.optimize()
|
|
|
-
|
|
|
- # Semantics - We use reflection to find the xml attribute names and values
|
|
|
- semantics_node = sc_node.find("semantics")
|
|
|
- semantics = SemanticConfiguration()
|
|
|
- load_semantics(semantics_node, semantics)
|
|
|
-
|
|
|
- # TODO: process datamodel node
|
|
|
- datamodel_node = sc_node.find("datamodel")
|
|
|
-
|
|
|
- statechart = Statechart(root=root, states=states, state_list=state_list, transition_list=transition_list, semantics=semantics)
|
|
|
-
|
|
|
- model.classes[name] = statechart
|
|
|
- if default:
|
|
|
- model.default_class = name
|
|
|
- return statechart
|
|
|
-
|
|
|
- # Start of function:
|
|
|
- src = sc_node.get("src")
|
|
|
- if src is None:
|
|
|
- _load(sc_node)
|
|
|
- else:
|
|
|
- external_sc_node = ET.parse(os.path.join(dir, src)).getroot()
|
|
|
- statechart = _load(external_sc_node)
|
|
|
-
|
|
|
- semantics_node = sc_node.find("override-semantics")
|
|
|
- load_semantics(semantics_node, statechart.semantics)
|
|
|
-
|
|
|
-def load_semantics(semantics_node, semantics: SemanticConfiguration):
|
|
|
+ state = ShallowHistoryState(name)
|
|
|
+ else:
|
|
|
+ return None
|
|
|
+
|
|
|
+ initial = state_node.get("initial", "")
|
|
|
+ for xml_child in state_node.getchildren():
|
|
|
+ child = load_state(xml_child) # may throw
|
|
|
+ if child:
|
|
|
+ state.addChild(child)
|
|
|
+ if child.short_name == initial:
|
|
|
+ state.default_state = child
|
|
|
+ if not initial and len(state.children) == 1:
|
|
|
+ state.default_state = state.children[0]
|
|
|
+
|
|
|
+ for xml_t in state_node.findall("transition", state_node.nsmap):
|
|
|
+ transition_nodes.append((xml_t, state))
|
|
|
+
|
|
|
+ # Parse enter/exit actions
|
|
|
+ def _get_enter_exit(tag, setter):
|
|
|
+ node = state_node.find(tag, state_node.nsmap)
|
|
|
+ if node is not None:
|
|
|
+ actions = load_actions(node)
|
|
|
+ setter(actions)
|
|
|
+
|
|
|
+ _get_enter_exit("onentry", state.setEnter)
|
|
|
+ _get_enter_exit("onexit", state.setExit)
|
|
|
+
|
|
|
+ return state
|
|
|
+
|
|
|
+ # Build tree structure
|
|
|
+ root_node = tree_node.find("state")
|
|
|
+ root = load_state(root_node)
|
|
|
+
|
|
|
+ # Add transitions
|
|
|
+ next_after_id = 0
|
|
|
+ for t_node, source in transition_nodes:
|
|
|
+ # Parse and find target state
|
|
|
+ target_string = t_node.get("target", "")
|
|
|
+ parse_tree = parser.parse(target_string, start="state_ref")
|
|
|
+ def find_state(sequence) -> State:
|
|
|
+ if sequence.data == "relative_path":
|
|
|
+ el = source
|
|
|
+ elif sequence.data == "absolute_path":
|
|
|
+ el = root
|
|
|
+ for item in sequence.children:
|
|
|
+ if item.type == "PARENT_NODE":
|
|
|
+ el = el.parent
|
|
|
+ elif item.type == "CURRENT_NODE":
|
|
|
+ continue
|
|
|
+ elif item.type == "IDENTIFIER":
|
|
|
+ el = [x for x in el.children if x.short_name == item.value][0]
|
|
|
+ return el
|
|
|
+ targets = [find_state(seq) for seq in parse_tree.children]
|
|
|
+
|
|
|
+ transition = Transition(source, targets)
|
|
|
+
|
|
|
+ # Trigger
|
|
|
+ name = t_node.get("event")
|
|
|
+ port = t_node.get("port")
|
|
|
+ after = t_node.get("after")
|
|
|
+ if after is not None:
|
|
|
+ name = "_after%d" % next_after_id # transition gets unique event name
|
|
|
+ next_after_id += 1
|
|
|
+ trigger = AfterTrigger(namespace.assign_event_id(name), name, Timestamp(after))
|
|
|
+ elif name is not None:
|
|
|
+ trigger = Trigger(namespace.assign_event_id(name), name, port)
|
|
|
+ namespace.add_inport(port)
|
|
|
+ else:
|
|
|
+ trigger = None
|
|
|
+ transition.setTrigger(trigger)
|
|
|
+ # Actions
|
|
|
+ actions = load_actions(t_node)
|
|
|
+ transition.setActions(actions)
|
|
|
+ # Guard
|
|
|
+ cond = t_node.get("cond")
|
|
|
+ if cond is not None:
|
|
|
+ parse_tree = parser.parse(cond, start="expr")
|
|
|
+ # print(parse_tree)
|
|
|
+ # print(parse_tree.pretty())
|
|
|
+ cond_expr = load_expression(parse_tree)
|
|
|
+ transition.setGuard(cond_expr)
|
|
|
+ source.addTransition(transition)
|
|
|
+
|
|
|
+ # Calculate stuff like list of ancestors, descendants, etc.
|
|
|
+ # Also get depth-first ordered lists of states and transitions (by source)
|
|
|
+ states: Dict[str, State] = {}
|
|
|
+ state_list: List[State] = []
|
|
|
+ transition_list: List[Transition] = []
|
|
|
+ root.init_tree(0, "", states, state_list, transition_list)
|
|
|
+
|
|
|
+ for t in transition_list:
|
|
|
+ t.optimize()
|
|
|
+
|
|
|
+ return StateTree(root=root, states=states, state_list=state_list, transition_list=transition_list)
|
|
|
+
|
|
|
+# Namespace is required for building event namespace and in/outport discovery.
|
|
|
+def load_statechart(namespace: ModelNamespace, sc_node) -> Statechart:
|
|
|
+ tree_node = sc_node.find("tree")
|
|
|
+ state_tree = load_state_tree(namespace, tree_node)
|
|
|
+
|
|
|
+ semantics_node = sc_node.find("semantics")
|
|
|
+ semantics = SemanticConfiguration() # start with default semantics
|
|
|
+ load_semantics(semantics, semantics_node)
|
|
|
+
|
|
|
+ datamodel_node = sc_node.find("datamodel")
|
|
|
+ # TODO: process datamodel node
|
|
|
+
|
|
|
+ return Statechart(tree=state_tree, semantics=semantics)
|
|
|
+
|
|
|
+def load_semantics(semantics: SemanticConfiguration, semantics_node):
|
|
|
if semantics_node is not None:
|
|
|
# Use reflection to find the possible XML attributes and their values
|
|
|
for aspect in dataclasses.fields(SemanticConfiguration):
|
|
|
key = semantics_node.get(aspect.name)
|
|
|
if key is not None:
|
|
|
- value = aspect.type[key.upper()]
|
|
|
- setattr(semantics, aspect.name, value)
|
|
|
+ if key == "*":
|
|
|
+ setattr(semantics, aspect.name, None)
|
|
|
+ else:
|
|
|
+ value = aspect.type[key.upper()]
|
|
|
+ setattr(semantics, aspect.name, value)
|
|
|
+
|
|
|
+# Returned list contains more than one test if the semantic configuration contains wildcard values.
|
|
|
+def load_test(src_file) -> List[Test]:
|
|
|
+ namespace = ModelNamespace()
|
|
|
|
|
|
-def load_test(src_file) -> Test:
|
|
|
- # We'll create a model with one statechart
|
|
|
- model = Model()
|
|
|
test_node = ET.parse(src_file).getroot()
|
|
|
sc_node = test_node.find("statechart")
|
|
|
- load_statechart(model, os.path.dirname(src_file), sc_node, name="??", default=True)
|
|
|
+ src = sc_node.get("src")
|
|
|
+ if src is None:
|
|
|
+ statechart = load_statechart(namespace, sc_node)
|
|
|
+ else:
|
|
|
+ external_node = ET.parse(os.path.join(os.path.dirname(src_file), src)).getroot()
|
|
|
+ statechart = load_statechart(namespace, external_node)
|
|
|
+ semantics_node = sc_node.find("override_semantics")
|
|
|
+ load_semantics(statechart.semantics, semantics_node)
|
|
|
|
|
|
input_node = test_node.find("input")
|
|
|
output_node = test_node.find("output")
|
|
|
-
|
|
|
input = load_input(input_node)
|
|
|
output = load_output(output_node)
|
|
|
|
|
|
- return Test(src_file, model, input, output)
|
|
|
+ def variant_description(i, variant) -> str:
|
|
|
+ if not variant:
|
|
|
+ return ""
|
|
|
+ return " (variant %d: %s)" % (i, ",".join(str(val) for val in variant.values()))
|
|
|
+
|
|
|
+ return [
|
|
|
+ Test(
|
|
|
+ src_file + variant_description(i, variant),
|
|
|
+ SingleInstanceModel(
|
|
|
+ namespace,
|
|
|
+ Statechart(tree=statechart.tree, semantics=dataclasses.replace(statechart.semantics, **variant))),
|
|
|
+ input,
|
|
|
+ output)
|
|
|
+ for i, variant in enumerate(statechart.semantics.wildcard_cart_product())
|
|
|
+ ]
|
|
|
|
|
|
def load_input(input_node) -> TestInput:
|
|
|
input = []
|