|
@@ -0,0 +1,240 @@
|
|
|
+import os
|
|
|
+import lxml.etree as ET
|
|
|
+from lark import Lark
|
|
|
+from sccd.runtime.test import *
|
|
|
+from sccd.runtime.model import *
|
|
|
+from sccd.runtime.statechart_syntax import *
|
|
|
+
|
|
|
+import sccd.schema
|
|
|
+schema_dir = os.path.dirname(sccd.schema.__file__)
|
|
|
+
|
|
|
+# Grammar for parsing state references and expressions
|
|
|
+grammar = open(os.path.join(schema_dir,"grammar.g"))
|
|
|
+parser = Lark(grammar, parser="lalr", start=["state_ref", "expr"])
|
|
|
+
|
|
|
+class ParseError(Exception):
|
|
|
+ def __init__(self, msg):
|
|
|
+ self.msg = msg
|
|
|
+
|
|
|
+def load_expression(parse_node) -> Expression:
|
|
|
+ if parse_node.data == "func_call":
|
|
|
+ function = load_expression(parse_node.children[0])
|
|
|
+ parameters = [load_expression(e) for e in parse_node.children[1].children]
|
|
|
+ return FunctionCall(function, parameters)
|
|
|
+ elif parse_node.data == "string":
|
|
|
+ return StringLiteral(parse_node.children[0].value[1:-1])
|
|
|
+ elif parse_node.data == "identifier":
|
|
|
+ return Identifier(parse_node.children[0].value)
|
|
|
+ elif parse_node.data == "array":
|
|
|
+ elements = [load_expression(e) for e in parse_node.children]
|
|
|
+ return Array(elements)
|
|
|
+ raise ParseError("Can't handle expression type: "+parse_node.data)
|
|
|
+
|
|
|
+# A statechart can only be loaded within the context of a model
|
|
|
+def load_statechart(model: Model, dir, sc_node, name="", default: bool = False):
|
|
|
+
|
|
|
+ def _load(sc_node) -> Statechart:
|
|
|
+
|
|
|
+ def load_action(action_node) -> Optional[Action]:
|
|
|
+ tag = ET.QName(action_node).localname
|
|
|
+ if tag == "raise":
|
|
|
+ name = action_node.get("event")
|
|
|
+ port = action_node.get("port")
|
|
|
+ if not port:
|
|
|
+ event_id = model.event_namespace.assign_id(name)
|
|
|
+ return RaiseInternalEvent(name=name, parameters=[], event_id=event_id)
|
|
|
+ else:
|
|
|
+ if port not in model.outports:
|
|
|
+ model.outports.append(port)
|
|
|
+ return RaiseOutputEvent(name=name, parameters=[], outport=port, time_offset=0)
|
|
|
+ else:
|
|
|
+ raise None
|
|
|
+
|
|
|
+ # parent_node: XML node containing any number of action nodes as direct children
|
|
|
+ def load_actions(parent_node) -> List[Action]:
|
|
|
+ return list(filter(lambda x: x is not None, map(lambda child: load_action(child), parent_node)))
|
|
|
+
|
|
|
+ transition_nodes: List[Tuple[Any, State]] = [] # List of (<transition>, State) tuples
|
|
|
+
|
|
|
+ # Recursively create state hierarchy from XML node
|
|
|
+ # Adding <transition> elements to the 'transitions' list as a side effect
|
|
|
+ def load_state(state_node) -> Optional[State]:
|
|
|
+ state = None
|
|
|
+ name = state_node.get("id", "")
|
|
|
+ tag = ET.QName(state_node).localname
|
|
|
+ if tag == "state":
|
|
|
+ state = State(name)
|
|
|
+ elif tag == "parallel" :
|
|
|
+ state = ParallelState(name)
|
|
|
+ elif tag == "history":
|
|
|
+ is_deep = state_node.get("type", "shallow") == "deep"
|
|
|
+ if is_deep:
|
|
|
+ state = DeepHistoryState(name)
|
|
|
+ else:
|
|
|
+ state = ShallowHistoryState(name)
|
|
|
+ else:
|
|
|
+ return None
|
|
|
+
|
|
|
+ initial = state_node.get("initial", "")
|
|
|
+ for xml_child in state_node.getchildren():
|
|
|
+ child = load_state(xml_child) # may throw
|
|
|
+ if child:
|
|
|
+ state.addChild(child)
|
|
|
+ if child.short_name == initial:
|
|
|
+ state.default_state = child
|
|
|
+ if not initial and len(state.children) == 1:
|
|
|
+ state.default_state = state.children[0]
|
|
|
+
|
|
|
+ for xml_t in state_node.findall("transition", state_node.nsmap):
|
|
|
+ transition_nodes.append((xml_t, state))
|
|
|
+
|
|
|
+ # Parse enter/exit actions
|
|
|
+ def _get_enter_exit(tag, setter):
|
|
|
+ node = state_node.find(tag, state_node.nsmap)
|
|
|
+ if node is not None:
|
|
|
+ actions = load_actions(node)
|
|
|
+ setter(actions)
|
|
|
+
|
|
|
+ _get_enter_exit("onentry", state.setEnter)
|
|
|
+ _get_enter_exit("onexit", state.setExit)
|
|
|
+
|
|
|
+ return state
|
|
|
+
|
|
|
+ # Build tree structure
|
|
|
+ tree_node = sc_node.find("tree")
|
|
|
+ root_node = tree_node.find("state")
|
|
|
+ root = load_state(root_node)
|
|
|
+
|
|
|
+ # Add transitions
|
|
|
+ next_after_id = 0
|
|
|
+ for t_node, source in transition_nodes:
|
|
|
+ # Parse and find target state
|
|
|
+ target_string = t_node.get("target", "")
|
|
|
+ parse_tree = parser.parse(target_string, start="state_ref")
|
|
|
+ def find_state(sequence) -> State:
|
|
|
+ if sequence.data == "relative_path":
|
|
|
+ el = source
|
|
|
+ elif sequence.data == "absolute_path":
|
|
|
+ el = root
|
|
|
+ for item in sequence.children:
|
|
|
+ if item.type == "PARENT_NODE":
|
|
|
+ el = el.parent
|
|
|
+ elif item.type == "CURRENT_NODE":
|
|
|
+ continue
|
|
|
+ elif item.type == "IDENTIFIER":
|
|
|
+ el = [x for x in el.children if x.short_name == item.value][0]
|
|
|
+ return el
|
|
|
+ targets = [find_state(seq) for seq in parse_tree.children]
|
|
|
+
|
|
|
+ transition = Transition(source, targets)
|
|
|
+
|
|
|
+ # Trigger
|
|
|
+ event = t_node.get("event")
|
|
|
+ port = t_node.get("port")
|
|
|
+ after = t_node.get("after")
|
|
|
+ if after is not None:
|
|
|
+ event = "_after%d" % next_after_id # transition gets unique event name
|
|
|
+ next_after_id += 1
|
|
|
+ trigger = AfterTrigger(event_namespace.assign_id(event), event, Timestamp(after))
|
|
|
+ elif event is not None:
|
|
|
+ trigger = Trigger(event_namespace.assign_id(event), event, port)
|
|
|
+ if port not in model.inports:
|
|
|
+ model.inports.append(port)
|
|
|
+ else:
|
|
|
+ trigger = None
|
|
|
+ transition.setTrigger(trigger)
|
|
|
+ # Actions
|
|
|
+ actions = load_actions(t_node)
|
|
|
+ transition.setActions(actions)
|
|
|
+ # Guard
|
|
|
+ cond = t_node.get("cond")
|
|
|
+ if cond is not None:
|
|
|
+ parse_tree = parser.parse(cond, start="expr")
|
|
|
+ # print(parse_tree)
|
|
|
+ # print(parse_tree.pretty())
|
|
|
+ cond_expr = load_expression(parse_tree)
|
|
|
+ transition.setGuard(cond_expr)
|
|
|
+ source.addTransition(transition)
|
|
|
+
|
|
|
+ # Calculate stuff like list of ancestors, descendants, etc.
|
|
|
+ # Also get depth-first ordered lists of states and transitions (by source)
|
|
|
+ states: Dict[str, State] = {}
|
|
|
+ state_list: List[State] = []
|
|
|
+ transition_list: List[Transition] = []
|
|
|
+ root.init_tree(0, "", states, state_list, transition_list)
|
|
|
+
|
|
|
+ for t in transition_list:
|
|
|
+ t.optimize()
|
|
|
+
|
|
|
+ # Semantics - We use reflection to find the xml attribute names and values
|
|
|
+ semantics_node = sc_node.find("semantics")
|
|
|
+ semantics = SemanticConfiguration()
|
|
|
+ load_semantics(semantics_node, semantics)
|
|
|
+
|
|
|
+ # TODO: process datamodel node
|
|
|
+ datamodel_node = sc_node.find("datamodel")
|
|
|
+
|
|
|
+ statechart = Statechart(root=root, states=states, state_list=state_list, transition_list=transition_list, semantics=semantics)
|
|
|
+
|
|
|
+ model.classes[name] = statechart
|
|
|
+ if default:
|
|
|
+ model.default_class = name
|
|
|
+ return statechart
|
|
|
+
|
|
|
+ # Start of function:
|
|
|
+ src = sc_node.get("src")
|
|
|
+ if src is None:
|
|
|
+ _load(sc_node)
|
|
|
+ else:
|
|
|
+ external_sc_node = ET.parse(os.path.join(dir, src)).getroot()
|
|
|
+ statechart = _load(external_sc_node)
|
|
|
+
|
|
|
+ semantics_node = sc_node.find("override-semantics")
|
|
|
+ load_semantics(semantics_node, statechart.semantics)
|
|
|
+
|
|
|
+def load_semantics(semantics_node, semantics: SemanticConfiguration):
|
|
|
+ if semantics_node is not None:
|
|
|
+ # Use reflection to find the possible XML attributes and their values
|
|
|
+ for aspect in dataclasses.fields(SemanticConfiguration):
|
|
|
+ key = semantics_node.get(aspect.name)
|
|
|
+ if key is not None:
|
|
|
+ value = aspect.type[key.upper()]
|
|
|
+ setattr(semantics, aspect.name, value)
|
|
|
+
|
|
|
+def load_test(src_file) -> Test:
|
|
|
+ # We'll create a model with one statechart
|
|
|
+ model = Model()
|
|
|
+ test_node = ET.parse(src_file).getroot()
|
|
|
+ sc_node = test_node.find("statechart")
|
|
|
+ load_statechart(model, os.path.dirname(src_file), sc_node, name="??", default=True)
|
|
|
+
|
|
|
+ input_node = test_node.find("input")
|
|
|
+ output_node = test_node.find("output")
|
|
|
+
|
|
|
+ input = load_input(input_node)
|
|
|
+ output = load_output(output_node)
|
|
|
+
|
|
|
+ return Test(src_file, model, input, output)
|
|
|
+
|
|
|
+def load_input(input_node) -> TestInput:
|
|
|
+ input = []
|
|
|
+ if input_node is not None:
|
|
|
+ for event_node in input_node:
|
|
|
+ name = event_node.get("name")
|
|
|
+ port = event_node.get("port")
|
|
|
+ time = int(event_node.get("time"))
|
|
|
+ input.append(InputEvent(name, port, [], time))
|
|
|
+ return input
|
|
|
+
|
|
|
+def load_output(output_node) -> TestOutput:
|
|
|
+ output = []
|
|
|
+ if output_node is not None:
|
|
|
+ for big_step_node in output_node:
|
|
|
+ big_step = []
|
|
|
+ for event_node in big_step_node:
|
|
|
+ name = event_node.get("name")
|
|
|
+ port = event_node.get("port")
|
|
|
+ parameters = [] # todo: read params
|
|
|
+ big_step.append(Event(id=0, name=name, port=port, parameters=parameters))
|
|
|
+ output.append(big_step)
|
|
|
+ return output
|