123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- from typing import *
- import re
- from lark.exceptions import *
- from sccd.statechart.static.types import *
- from sccd.statechart.static.statechart import *
- from sccd.statechart.static.tree import *
- from sccd.statechart.dynamic.builtin_scope import *
- from sccd.util.xml_parser import *
- from sccd.statechart.parser.text import *
- from sccd.statechart.static.in_state import InStateMacroExpansion
- class SkipFile(Exception):
- pass
- parse_f = functools.partial(parse, decorate_exceptions=(ModelStaticError,LarkError))
- def check_duration_type(type):
- if type != SCCDDuration:
- msg = "Expression is '%s' type. Expected 'Duration' type." % str(type)
- if type == SCCDInt:
- msg += "\n Hint: Did you forget a duration unit suffix? ('s', 'ms', ...)"
- raise XmlError(msg)
- # path: filesystem path for finding external statecharts
- def statechart_parser_rules(globals, path, load_external = True, parse_f = parse_f, text_parser=TextParser(globals)) -> Rules:
- import os
- def parse_statechart(el):
- ext_file = el.get("src")
- if ext_file is None:
- statechart = Statechart(
- semantics=SemanticConfiguration(),
- scope=Scope("statechart", parent=BuiltIn),
- datamodel=None,
- internal_events=Bitmap(),
- internally_raised_events=Bitmap(),
- inport_events={},
- event_outport={},
- tree=None,
- )
- else:
- if not load_external:
- raise SkipFile("Parser configured not to load statecharts from external files.")
- statechart = parse_f(os.path.join(path, ext_file), [("statechart", statechart_parser_rules(globals, path, load_external=False, parse_f=parse_f, text_parser=text_parser))])
- def parse_semantics(el):
- available_aspects = SemanticConfiguration.get_fields()
- for aspect_name, text in el.attrib.items():
- try:
- aspect_type = available_aspects[aspect_name]
- except KeyError:
- raise XmlError("invalid semantic aspect: '%s'" % aspect_name)
- result = text_parser.parse_semantic_choice(text)
- if result.data == "wildcard":
- semantic_choice = list(aspect_type) # all options
- elif result.data == "list":
- semantic_choice = [aspect_type[token.value.upper()] for token in result.children]
- if len(semantic_choice) == 1:
- semantic_choice = semantic_choice[0]
- setattr(statechart.semantics, aspect_name, semantic_choice)
- def parse_datamodel(el):
- body = text_parser.parse_stmt(el.text)
- body.init_stmt(statechart.scope)
- statechart.datamodel = body
- def parse_inport(el):
- port_name = require_attribute(el, "name")
- globals.inports.assign_id(port_name)
- def parse_event(el):
- event_name = require_attribute(el, "name")
- event_id = globals.events.assign_id(event_name)
- port_events = statechart.inport_events.setdefault(port_name, Bitmap())
- port_events |= bit(event_id)
- statechart.inport_events[port_name] = port_events
- statechart.internal_events |= bit(event_id)
- return [("event+", parse_event)]
- def parse_outport(el):
- port_name = require_attribute(el, "name")
- def parse_event(el):
- event_name = require_attribute(el, "name")
- statechart.event_outport[event_name] = port_name
- return [("event+", parse_event)]
- def parse_root(el):
- if el.get("id") is not None:
- raise XmlError("<root> state must not have 'id' attribute.")
- root = State("", parent=None)
- children_dict = {}
- transitions = [] # All of the statechart's transitions accumulate here, cause we still need to find their targets, which we can't do before the entire state tree has been built. We find their targets when encoutering the </root> closing tag.
- after_id = 0 # After triggers need unique IDs within the scope of the statechart model
- refs_to_resolve = [] # Transition targets and INSTATE arguments. Resolved after constructing state tree.
- def get_default_state(el, state, children_dict):
- have_initial = False
- def parse_attr_initial(initial):
- nonlocal default_state
- nonlocal have_initial
- default_state = None
- have_initial = True
- try:
- default_state = children_dict[initial]
- except KeyError as e:
- raise XmlError("Not a child.") from e
- if_attribute(el, "initial", parse_attr_initial)
- if not have_initial:
- if len(state.children) == 1:
- default_state = state.children[0]
- else:
- raise XmlError("More than 1 child state: must set 'initial' attribute.")
- return default_state
- def state_child_rules(parent, sibling_dict: Dict[str, State]):
- def macro_in_state(params):
- if len(params) != 1:
- raise XmlError("Macro @in: Expected 1 parameter")
- ref= StateRef(source=parent, path=text_parser.parse_path(params[0].string))
- refs_to_resolve.append(ref)
- return InStateMacroExpansion(ref=ref)
- text_parser.parser.options.transformer.set_macro("@in", macro_in_state)
- # A transition's guard expression and action statements can read the transition's event parameters, and also possibly the current state configuration. We therefore now wrap these into a function with a bunch of parameters for those values that we want to bring into scope.
- def wrap_transition_params(expr_or_stmt, trigger: Trigger):
- if isinstance(expr_or_stmt, Statement):
- # Transition's action code
- body = expr_or_stmt
- elif isinstance(expr_or_stmt, Expression):
- # Transition's guard
- body = ReturnStatement(expr=expr_or_stmt)
- else:
- raise Exception("Unexpected error in parser")
- # The joy of writing expressions in abstract syntax:
- wrapped = FunctionDeclaration(
- params_decl=
- # The param '@conf' (which, on purpose, is an illegal identifier in textual concrete syntax, to prevent naming collisions) will contain the statechart's configuration as a bitmap (SCCDInt). This parameter is currently only used in the expansion of the INSTATE-macro.
- [ParamDecl(name="@conf", formal_type=SCCDStateConfiguration(state=parent))]
- # Plus all the parameters of the enabling events of the transition's trigger:
- + [param for event in trigger.enabling for param in event.params_decl],
- body=body)
- return wrapped
- def actions_rules(scope, wrap_trigger: Trigger = EMPTY_TRIGGER):
- def parse_raise(el):
- params = []
- def parse_param(el):
- expr_text = require_attribute(el, "expr")
- expr = text_parser.parse_expr(expr_text)
- function = wrap_transition_params(expr, trigger=wrap_trigger)
- function.init_expr(scope)
- function.scope.name = "event_param"
- params.append(function)
- def finish_raise():
- event_name = require_attribute(el, "event")
- try:
- port = statechart.event_outport[event_name]
- except KeyError:
- # Legacy fallback: read port from attribute
- port = el.get("port")
- if port is None:
- # internal event
- event_id = globals.events.assign_id(event_name)
- statechart.internally_raised_events |= bit(event_id)
- return RaiseInternalEvent(event_id=event_id, name=event_name, params=params)
- else:
- # output event - no ID in global namespace
- statechart.event_outport[event_name] = port
- globals.outports.assign_id(port)
- return RaiseOutputEvent(name=event_name, params=params, outport=port)
- return ([("param*", parse_param)], finish_raise)
- def parse_code(el):
- def finish_code():
- block = text_parser.parse_stmt(el.text)
- function = wrap_transition_params(block, trigger=wrap_trigger)
- function.init_expr(scope)
- function.scope.name = "code"
- return Code(function)
- return ([], finish_code)
- return {"raise": parse_raise, "code": parse_code}
- def common(el, constructor):
- short_name = require_attribute(el, "id")
- match = re.match("[A-Za-z_][A-Za-z_0-9]*", short_name)
- if match is None or match[0] != short_name:
- raise XmlError("invalid id")
- state = constructor(short_name, parent)
- already_there = sibling_dict.setdefault(short_name, state)
- if already_there is not state:
- raise XmlError("Sibling state with the same id exists.")
- return state
- def common_nonpseudo(el, constructor):
- state = common(el, constructor)
- if el.get("stable", "") == "true":
- state.stable = True
- return state
- def parse_state(el):
- state = common_nonpseudo(el, State)
- children_dict = {}
- def finish_state():
- if len(state.children) > 0:
- state.type = OrState(state=state,
- default_state=get_default_state(el, state, children_dict))
- else:
- state.type = AndState(state=state)
- return (state_child_rules(parent=state, sibling_dict=children_dict), finish_state)
- def parse_parallel(el):
- state = common_nonpseudo(el, State)
- state.type = AndState(state=state)
- return state_child_rules(parent=state, sibling_dict={})
- def parse_history(el):
- history_type = el.get("type", "shallow")
- if history_type == "deep":
- state = common(el, DeepHistoryState)
- elif history_type == "shallow":
- state = common(el, ShallowHistoryState)
- else:
- raise XmlError("attribute 'type' must be \"shallow\" or \"deep\".")
- def parse_onentry(el):
- def finish_onentry(*actions):
- parent.enter = actions
- return (actions_rules(scope=statechart.scope), finish_onentry)
- def parse_onexit(el):
- def finish_onexit(*actions):
- parent.exit = actions
- return (actions_rules(scope=statechart.scope), finish_onexit)
- def parse_transition(el):
- if parent is root:
- raise XmlError("Root cannot be source of a transition.")
- target_string = require_attribute(el, "target")
- try:
- path = text_parser.parse_path(target_string)
- except Exception as e:
- raise XmlErrorElement(t_el, "Parsing target '%s': %s" % (transition.target_string, str(e))) from e
- transition = Transition(source=parent, path=path)
- refs_to_resolve.append(transition)
- have_event_attr = False
- def parse_attr_event(event):
- nonlocal have_event_attr
- have_event_attr = True
- positive_events, negative_events = text_parser.parse_events_decl(event)
- # Optimization: sort events by ID
- # Allows us to save time later.
- positive_events.sort(key=lambda e: e.id)
- if not negative_events:
- transition.trigger = Trigger(positive_events)
- statechart.internal_events |= transition.trigger.enabling_bitmap
- else:
- transition.trigger = NegatedTrigger(positive_events, negative_events)
- statechart.internal_events |= transition.trigger.enabling_bitmap
- def parse_attr_after(after):
- nonlocal after_id
- if have_event_attr:
- raise XmlError("Cannot specify 'after' and 'event' at the same time.")
- after_expr = text_parser.parse_expr(after)
- after_type = after_expr.init_expr(statechart.scope)
- check_duration_type(after_type)
- # After-events should only be generated by the runtime.
- # By putting a '+' in the event name (which isn't an allowed character in the parser), we ensure that the user will never accidentally (careless) or purposefully (evil) generate a valid after-event.
- event_name = "+%d" % after_id
- transition.trigger = AfterTrigger(globals.events.assign_id(event_name), event_name, after_id, after_expr)
- statechart.internal_events |= transition.trigger.enabling_bitmap
- after_id += 1
- def parse_attr_cond(cond):
- # Transition's guard expression
- guard_expr = text_parser.parse_expr(cond)
- guard_function = wrap_transition_params(guard_expr, transition.trigger)
- guard_type = guard_function.init_expr(statechart.scope)
- guard_function.scope.name = "guard"
- if guard_type.return_type is not SCCDBool:
- raise XmlError("Guard should be an expression evaluating to 'bool'.")
- transition.guard = guard_function
- if_attribute(el, "event", parse_attr_event)
- if_attribute(el, "after", parse_attr_after)
- if_attribute(el, "cond", parse_attr_cond)
- def finish_transition(*actions):
- transition.actions = actions
- transitions.append((transition, el))
- parent.transitions.append(transition)
- return (actions_rules(scope=statechart.scope, wrap_trigger=transition.trigger), finish_transition)
- def finish_state_child_rules():
- text_parser.parser.options.transformer.unset_macro("@in")
- return ({"state": parse_state, "parallel": parse_parallel, "history": parse_history, "onentry": parse_onentry, "onexit": parse_onexit, "transition": parse_transition}, finish_state_child_rules)
- def finish_root():
- root.type = OrState(state=root, default_state=get_default_state(el, root, children_dict))
- # State tree has been constructed, we can now resolve state refs:
- for ref in refs_to_resolve:
- try:
- ref.resolve(root=root)
- except PathError as e:
- raise XmlErrorElement(t_el, "target=\"%s\": %s" % (transition.target_string, str(e))) from e
- # Next, visit tree to statically calculate many properties of states and transitions:
- statechart.tree = StateTree(root)
- return (state_child_rules(root, sibling_dict=children_dict), finish_root)
- def finish_statechart():
- return statechart
- if ext_file is None:
- return ([("semantics?", parse_semantics), ("datamodel?", parse_datamodel), ("inport*", parse_inport), ("outport*", parse_outport), ("root", parse_root)], finish_statechart)
- else:
- return ([("override_semantics?", parse_semantics)], finish_statechart)
- return parse_statechart
|