|
@@ -6,7 +6,7 @@ from lark import Lark
|
|
|
import sccd.compiler
|
|
|
from sccd.runtime.statechart_syntax import *
|
|
|
from sccd.runtime.event import Event
|
|
|
-from sccd.runtime.semantic_options import *
|
|
|
+from sccd.runtime.semantic_options import SemanticConfiguration
|
|
|
|
|
|
schema_path = os.path.join(
|
|
|
os.path.dirname(sccd.compiler.__file__),
|
|
@@ -15,14 +15,16 @@ schema_path = os.path.join(
|
|
|
schema = ET.XMLSchema(ET.parse(schema_path))
|
|
|
|
|
|
grammar = open(os.path.join(os.path.dirname(os.path.abspath(__file__)),"grammar.g"))
|
|
|
-l = Lark(grammar)
|
|
|
+l = Lark(grammar, parser="lalr", start=["target_expr"])
|
|
|
|
|
|
+
|
|
|
+# Some types immitating the types that are produced by the compiler
|
|
|
@dataclass
|
|
|
class Statechart:
|
|
|
_class: Any
|
|
|
root: State
|
|
|
states: Dict[str, State]
|
|
|
- semantics: SemanticOptions
|
|
|
+ semantics: SemanticConfiguration
|
|
|
|
|
|
@dataclass
|
|
|
class Class:
|
|
@@ -36,11 +38,19 @@ class Model:
|
|
|
classes: Dict[str, Any]
|
|
|
default_class: str
|
|
|
|
|
|
+@dataclass
|
|
|
+class InputEvent:
|
|
|
+ name: str
|
|
|
+ port: str
|
|
|
+ parameters: List[Any]
|
|
|
+ time_offset: Timestamp
|
|
|
+
|
|
|
@dataclass
|
|
|
class Test:
|
|
|
- input_events: List[Any]
|
|
|
+ input_events: List[InputEvent]
|
|
|
expected_events: List[Event]
|
|
|
|
|
|
+
|
|
|
def load_model(src_file) -> Tuple[Model, Optional[Test]]:
|
|
|
tree = ET.parse(src_file)
|
|
|
schema.assertValid(tree)
|
|
@@ -52,29 +62,36 @@ def load_model(src_file) -> Tuple[Model, Optional[Test]]:
|
|
|
for c in classes:
|
|
|
class_name = c.get("name")
|
|
|
default = c.get("default", "")
|
|
|
+
|
|
|
scxml_node = c.find("scxml", root.nsmap)
|
|
|
root_state, states = load_tree(scxml_node)
|
|
|
+
|
|
|
# Semantics - We use reflection to find the xml attribute names and values
|
|
|
- semantics = SemanticOptions()
|
|
|
- for aspect in dataclasses.fields(SemanticOptions):
|
|
|
+ semantics = SemanticConfiguration()
|
|
|
+ for aspect in dataclasses.fields(SemanticConfiguration):
|
|
|
key = scxml_node.get(aspect.name)
|
|
|
if key is not None:
|
|
|
value = aspect.type[key.upper()]
|
|
|
setattr(semantics, aspect.name, value)
|
|
|
- print(semantics)
|
|
|
+
|
|
|
class_ = Class(class_name, None)
|
|
|
statechart = Statechart(class_, root_state, states, semantics)
|
|
|
class_.statechart = statechart
|
|
|
+
|
|
|
model.classes[class_name] = lambda: class_
|
|
|
if default:
|
|
|
model.default_class = class_name
|
|
|
|
|
|
- transitions = root.findall(".//transition", root.nsmap)
|
|
|
- for t in transitions:
|
|
|
- port = t.get("port", "")
|
|
|
- if port != "" and port not in model.outports:
|
|
|
- print("found port", port)
|
|
|
- model.outports.append(port)
|
|
|
+ def find_ports(element_path, collection):
|
|
|
+ elements = root.findall(element_path, root.nsmap)
|
|
|
+ for e in elements:
|
|
|
+ port = e.get("port")
|
|
|
+ if port != None and port not in collection:
|
|
|
+ collection.append(port)
|
|
|
+ # Any 'port' attribute of a <transition> element is an input port
|
|
|
+ find_ports(".//transition", model.inports)
|
|
|
+ # Any 'port' attribute of a <raise> element is an output port
|
|
|
+ find_ports(".//raise", model.outports)
|
|
|
|
|
|
test = None
|
|
|
test_node = root.find(".//test", root.nsmap)
|
|
@@ -82,8 +99,12 @@ def load_model(src_file) -> Tuple[Model, Optional[Test]]:
|
|
|
input_events = []
|
|
|
expected_events = []
|
|
|
input_node = test_node.find("input", root.nsmap)
|
|
|
- if input_node:
|
|
|
- pass
|
|
|
+ if input_node is not None:
|
|
|
+ for event_node in input_node:
|
|
|
+ name = event_node.get("name")
|
|
|
+ port = event_node.get("port")
|
|
|
+ time = int(event_node.get("time"))
|
|
|
+ input_events.append(InputEvent(name, port, [], time))
|
|
|
slots = test_node.findall("expected/slot", root.nsmap)
|
|
|
for s in slots:
|
|
|
slot = []
|
|
@@ -98,14 +119,14 @@ def load_model(src_file) -> Tuple[Model, Optional[Test]]:
|
|
|
|
|
|
return (model, test)
|
|
|
|
|
|
+class InvalidTag(Exception):
|
|
|
+ pass
|
|
|
|
|
|
def load_tree(scxml_node) -> Tuple[State, Dict[str, State]]:
|
|
|
|
|
|
states: Dict[str, State] = {}
|
|
|
transitions: List[Tuple[Any, State]] = [] # List of (<transition>, State) tuples
|
|
|
|
|
|
- class InvalidTag(Exception):
|
|
|
- pass
|
|
|
|
|
|
# Recursively create state hierarchy from XML node
|
|
|
# Adding <transition> elements to the 'transitions' list as a side effect
|
|
@@ -134,13 +155,24 @@ def load_tree(scxml_node) -> Tuple[State, Dict[str, State]]:
|
|
|
if child.short_name == initial:
|
|
|
state.default_state = child
|
|
|
except InvalidTag:
|
|
|
- pass
|
|
|
+ pass # skip non-state tags
|
|
|
|
|
|
if not initial and len(state.children) == 1:
|
|
|
state.default_state = state.children[0]
|
|
|
|
|
|
for xml_t in xml_node.findall("transition", xml_node.nsmap):
|
|
|
transitions.append((xml_t, state))
|
|
|
+
|
|
|
+ # Parse enter/exit actions
|
|
|
+ def _get_enter_exit(tag, setter):
|
|
|
+ node = xml_node.find(tag, xml_node.nsmap)
|
|
|
+ if node is not None:
|
|
|
+ actions = load_actions(node)
|
|
|
+ setter(actions)
|
|
|
+
|
|
|
+ _get_enter_exit("onentry", state.setEnter)
|
|
|
+ _get_enter_exit("onexit", state.setExit)
|
|
|
+
|
|
|
return state
|
|
|
|
|
|
# First build a state tree
|
|
@@ -149,9 +181,9 @@ def load_tree(scxml_node) -> Tuple[State, Dict[str, State]]:
|
|
|
|
|
|
# Add transitions
|
|
|
for xml_t, source in transitions:
|
|
|
+ # Parse and find target state
|
|
|
target_string = xml_t.get("target", "")
|
|
|
parse_tree = l.parse(target_string, start="target_expr")
|
|
|
-
|
|
|
def find_state(sequence) -> State:
|
|
|
if sequence.data == "relative_path":
|
|
|
el = source
|
|
@@ -165,13 +197,44 @@ def load_tree(scxml_node) -> Tuple[State, Dict[str, State]]:
|
|
|
elif item.type == "IDENTIFIER":
|
|
|
el = [x for x in el.children if x.short_name == item.value][0]
|
|
|
return el
|
|
|
-
|
|
|
targets = [find_state(seq) for seq in parse_tree.children]
|
|
|
+
|
|
|
transition = Transition(source, targets)
|
|
|
+
|
|
|
+ # Trigger
|
|
|
+ event = xml_t.get("event")
|
|
|
+ port = xml_t.get("port")
|
|
|
+ trigger = None if event == None else Trigger(event, port)
|
|
|
+ transition.setTrigger(trigger)
|
|
|
+ # Actions
|
|
|
+ actions = load_actions(xml_t)
|
|
|
+ transition.setActions(actions)
|
|
|
# todo: set guard
|
|
|
- # todo: set trigger
|
|
|
- transition.setTrigger(None)
|
|
|
- # todo: set actions
|
|
|
+
|
|
|
source.addTransition(transition)
|
|
|
|
|
|
return (root, states)
|
|
|
+
|
|
|
+def load_action(action_node) -> Optional[Action]:
|
|
|
+ tag = ET.QName(action_node).localname
|
|
|
+ if tag == "raise":
|
|
|
+ event = action_node.get("event")
|
|
|
+ port = action_node.get("port")
|
|
|
+ if not port:
|
|
|
+ return RaiseInternalEvent(name=event, parameters=[])
|
|
|
+ else:
|
|
|
+ return RaiseOutputEvent(name=event, parameters=[], outport=port, time_offset=0)
|
|
|
+ else:
|
|
|
+ raise InvalidTag()
|
|
|
+
|
|
|
+# parent_node: XML node containing 0 or more action nodes as direct children
|
|
|
+def load_actions(parent_node) -> List[Action]:
|
|
|
+ actions = []
|
|
|
+ for node in parent_node:
|
|
|
+ try:
|
|
|
+ a = load_action(node)
|
|
|
+ if a:
|
|
|
+ actions.append(a)
|
|
|
+ except InvalidTag:
|
|
|
+ pass # skip non-action tags
|
|
|
+ return actions
|