xml.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. from typing import *
  2. import re
  3. from lark.exceptions import *
  4. from sccd.statechart.static.types import *
  5. from sccd.statechart.static.statechart import *
  6. from sccd.statechart.static.tree import *
  7. from sccd.statechart.dynamic.builtin_scope import *
  8. from sccd.util.xml_parser import *
  9. from sccd.statechart.parser.text import *
  10. from sccd.statechart.static.in_state import InStateMacroExpansion
  11. class SkipFile(Exception):
  12. pass
  13. parse_f = functools.partial(parse, decorate_exceptions=(ModelStaticError,LarkError))
  14. def check_duration_type(type):
  15. if type != SCCDDuration:
  16. msg = "Expression is '%s' type. Expected 'Duration' type." % str(type)
  17. if type == SCCDInt:
  18. msg += "\n Hint: Did you forget a duration unit suffix? ('s', 'ms', ...)"
  19. raise XmlError(msg)
  20. # path: filesystem path for finding external statecharts
  21. def statechart_parser_rules(globals, path, load_external = True, parse_f = parse_f, text_parser=TextParser(globals)) -> Rules:
  22. import os
  23. def parse_statechart(el):
  24. ext_file = el.get("src")
  25. if ext_file is None:
  26. statechart = Statechart(scope=Scope("statechart", parent=None))
  27. else:
  28. if not load_external:
  29. raise SkipFile("Parser configured not to load statecharts from external files.")
  30. statechart = parse_f(os.path.join(path, ext_file), [("statechart", statechart_parser_rules(globals, path, load_external=False, parse_f=parse_f, text_parser=text_parser))])
  31. def parse_semantics(el):
  32. available_aspects = SemanticConfiguration.get_fields()
  33. for aspect_name, text in el.attrib.items():
  34. try:
  35. aspect_type = available_aspects[aspect_name]
  36. except KeyError:
  37. raise XmlError("invalid semantic aspect: '%s'" % aspect_name)
  38. result = text_parser.parse_semantic_choice(text)
  39. if result.data == "wildcard":
  40. semantic_choice = list(aspect_type) # all options
  41. elif result.data == "list":
  42. semantic_choice = [aspect_type[token.value.upper()] for token in result.children]
  43. if len(semantic_choice) == 1:
  44. semantic_choice = semantic_choice[0]
  45. setattr(statechart.semantics, aspect_name, semantic_choice)
  46. def parse_datamodel(el):
  47. body = text_parser.parse_block(el.text)
  48. body.init_stmt(statechart.scope)
  49. statechart.datamodel = body
  50. def parse_event_param(el):
  51. type_text = require_attribute(el, "type")
  52. param_type = text_parser.parse_type(type_text)
  53. def finish_param():
  54. return param_type
  55. return ([], finish_param)
  56. def get_port_parser(add_to):
  57. def parse_port(el):
  58. def parse_event(el):
  59. event_name = require_attribute(el, "name")
  60. if event_name in add_to:
  61. raise XmlError("event already declared earlier: %s" % event_name)
  62. def finish_event(*params):
  63. add_to[event_name] = list(params)
  64. return ([("param*", parse_event_param)], finish_event)
  65. return [("event*", parse_event)]
  66. return parse_port
  67. def parse_root(el):
  68. if el.get("id") is not None:
  69. raise XmlError("<root> state must not have 'id' attribute.")
  70. root = State("", parent=None)
  71. children_dict = {}
  72. transitions = [] # All of the statechart's transitions accumulate here, cause we still need to find their targets, which we can't do before the entire state tree has been built. We find their targets when encoutering the </root> closing tag.
  73. after_id = 0 # After triggers need unique IDs within the scope of the statechart model
  74. refs_to_resolve = [] # Transition targets and INSTATE arguments. Resolved after constructing state tree.
  75. def get_default_state(el, state, children_dict):
  76. have_initial = False
  77. def parse_attr_initial(initial):
  78. nonlocal default_state
  79. nonlocal have_initial
  80. default_state = None
  81. have_initial = True
  82. try:
  83. default_state = children_dict[initial]
  84. except KeyError as e:
  85. raise XmlError("Not a child.") from e
  86. if_attribute(el, "initial", parse_attr_initial)
  87. if not have_initial:
  88. if len(state.children) == 1:
  89. default_state = state.children[0]
  90. else:
  91. raise XmlError("More than 1 child state: must set 'initial' attribute.")
  92. return default_state
  93. def state_child_rules(parent, sibling_dict: Dict[str, State]):
  94. # A transition's guard expression and action statements can read the transition's event parameters, and also possibly the current state configuration. We therefore now wrap these into a function with a bunch of parameters for those values that we want to bring into scope.
  95. def wrap_transition_params(expr_or_stmt, transition: Transition):
  96. if isinstance(expr_or_stmt, Statement):
  97. # Transition's action code
  98. body = expr_or_stmt
  99. elif isinstance(expr_or_stmt, Expression):
  100. # Transition's guard
  101. body = ReturnStatement(expr=expr_or_stmt)
  102. else:
  103. raise Exception("Unexpected error in parser")
  104. # The joy of writing expressions in abstract syntax:
  105. if transition is None:
  106. wrapped = FunctionDeclaration(params_decl=[], body=body)
  107. else:
  108. wrapped = FunctionDeclaration(
  109. params_decl=
  110. # The param '@conf' (which, on purpose, is an illegal identifier in textual concrete syntax, to prevent naming collisions) will contain the statechart's configuration as a bitmap (SCCDInt). This parameter is currently only used in the expansion of the INSTATE-macro.
  111. [ParamDecl(name="@conf", formal_type=SCCDStateConfiguration(state=parent))]
  112. # Plus all the parameters of the enabling events of the transition's trigger:
  113. + [param for event in transition.trigger.enabling for param in event.params_decl],
  114. body=body)
  115. return wrapped
  116. def actions_rules(scope, transition: Transition=None):
  117. def parse_raise(el):
  118. event_name = require_attribute(el, "event")
  119. params = []
  120. def parse_param(el):
  121. # Every event parameter becomes a function, with the event trigger's parameters as parameters
  122. expr_text = require_attribute(el, "expr")
  123. expr = text_parser.parse_expr(expr_text)
  124. function = wrap_transition_params(expr, transition=transition)
  125. function.init_expr(scope)
  126. function.scope.name = "event_param"
  127. params.append(function)
  128. def finish_raise():
  129. param_types = [p.return_type for p in params]
  130. try:
  131. formal_param_types = statechart.out_events[event_name]
  132. result = RaiseOutputEvent(name=event_name, params=params)
  133. except KeyError:
  134. try:
  135. formal_param_types = statechart.internal_events[event_name]
  136. except KeyError:
  137. formal_param_types = param_types
  138. statechart.internal_events[event_name] = formal_param_types
  139. result = RaiseInternalEvent(name=event_name, params=params)
  140. if param_types != formal_param_types:
  141. raise XmlError("Event '%s': Parameter types %s don't match earlier %s" % (event_name, param_types, formal_param_types))
  142. return result
  143. return ([("param*", parse_param)], finish_raise)
  144. def parse_code(el):
  145. def finish_code():
  146. # Every block of code becomes a function, with the event trigger's parameters as parameters
  147. block = text_parser.parse_block(el.text)
  148. function = wrap_transition_params(block, transition=transition)
  149. function.init_expr(scope)
  150. function.scope.name = "code"
  151. return Code(function)
  152. return ([], finish_code)
  153. return {"raise": parse_raise, "code": parse_code}
  154. def common(el, constructor):
  155. short_name = require_attribute(el, "id")
  156. match = re.match("[A-Za-z_][A-Za-z_0-9]*", short_name)
  157. if match is None or match[0] != short_name:
  158. raise XmlError("invalid id")
  159. state = constructor(short_name, parent)
  160. already_there = sibling_dict.setdefault(short_name, state)
  161. if already_there is not state:
  162. raise XmlError("Sibling state with the same id exists.")
  163. return state
  164. def common_nonpseudo(el, constructor):
  165. state = common(el, constructor)
  166. if el.get("stable", "") == "true":
  167. state.stable = True
  168. return state
  169. def parse_state(el):
  170. state = common_nonpseudo(el, State)
  171. children_dict = {}
  172. def finish_state():
  173. if len(state.children) > 0:
  174. state.type = OrState(state=state,
  175. default_state=get_default_state(el, state, children_dict))
  176. else:
  177. state.type = AndState(state=state)
  178. return (state_child_rules(parent=state, sibling_dict=children_dict), finish_state)
  179. def parse_parallel(el):
  180. state = common_nonpseudo(el, State)
  181. state.type = AndState(state=state)
  182. return state_child_rules(parent=state, sibling_dict={})
  183. def parse_history(el):
  184. history_type = el.get("type", "shallow")
  185. if history_type == "deep":
  186. state = common(el, DeepHistoryState)
  187. elif history_type == "shallow":
  188. state = common(el, ShallowHistoryState)
  189. else:
  190. raise XmlError("attribute 'type' must be \"shallow\" or \"deep\".")
  191. def parse_onentry(el):
  192. def finish_onentry(*actions):
  193. parent.enter = actions
  194. return (actions_rules(scope=statechart.scope), finish_onentry)
  195. def parse_onexit(el):
  196. def finish_onexit(*actions):
  197. parent.exit = actions
  198. return (actions_rules(scope=statechart.scope), finish_onexit)
  199. def parse_transition(el):
  200. def macro_in_state(params):
  201. if len(params) != 1:
  202. raise XmlError("Macro @in: Expected 1 parameter")
  203. ref= StateRef(source=parent, path=text_parser.parse_path(params[0].string))
  204. refs_to_resolve.append(ref)
  205. return InStateMacroExpansion(ref=ref)
  206. # INSTATE-macro allowed in transition's guard and actions
  207. text_parser.parser.options.transformer.set_macro("@in", macro_in_state)
  208. if parent is root:
  209. raise XmlError("Root cannot be source of a transition.")
  210. target_string = require_attribute(el, "target")
  211. try:
  212. path = text_parser.parse_path(target_string)
  213. except Exception as e:
  214. raise XmlErrorElement(t_el, "Parsing target '%s': %s" % (transition.target_string, str(e))) from e
  215. transition = Transition(source=parent, path=path)
  216. refs_to_resolve.append(transition)
  217. have_event_attr = False
  218. def parse_attr_event(event):
  219. nonlocal have_event_attr
  220. have_event_attr = True
  221. positive_events, negative_events = text_parser.parse_events_decl(event)
  222. if not negative_events:
  223. transition.trigger = Trigger(positive_events)
  224. else:
  225. transition.trigger = NegatedTrigger(positive_events, negative_events)
  226. def parse_attr_after(after):
  227. nonlocal after_id
  228. if have_event_attr:
  229. raise XmlError("Cannot specify 'after' and 'event' at the same time.")
  230. after_expr = text_parser.parse_expr(after)
  231. after_type = after_expr.init_expr(statechart.scope)
  232. check_duration_type(after_type)
  233. # After-events should only be generated by the runtime.
  234. # By putting a '+' in the event name (which isn't an allowed character in the parser), we ensure that the user will never accidentally (careless) or purposefully (evil) generate a valid after-event.
  235. event_name = "+%d" % after_id
  236. statechart.in_events[event_name] = []
  237. transition.trigger = AfterTrigger(event_name, after_id, after_expr)
  238. after_id += 1
  239. def parse_attr_cond(cond):
  240. # Transition's guard expression
  241. guard_expr = text_parser.parse_expr(cond)
  242. guard_function = wrap_transition_params(expr_or_stmt=guard_expr, transition=transition)
  243. guard_type = guard_function.init_expr(statechart.scope)
  244. guard_function.scope.name = "guard"
  245. if guard_type.return_type is not SCCDBool:
  246. raise XmlError("Guard should be an expression evaluating to 'bool'.")
  247. transition.guard = guard_function
  248. if_attribute(el, "event", parse_attr_event)
  249. if_attribute(el, "after", parse_attr_after)
  250. if_attribute(el, "cond", parse_attr_cond)
  251. def finish_transition(*actions):
  252. transition.actions = actions
  253. transitions.append((transition, el))
  254. parent.transitions.append(transition)
  255. # INSTATE-macro not allowed outside of transition's guard or actions
  256. text_parser.parser.options.transformer.unset_macro("@in")
  257. return (actions_rules(scope=statechart.scope, transition=transition), finish_transition)
  258. return {"state": parse_state, "parallel": parse_parallel, "history": parse_history, "onentry": parse_onentry, "onexit": parse_onexit, "transition": parse_transition}
  259. def finish_root():
  260. root.type = OrState(state=root, default_state=get_default_state(el, root, children_dict))
  261. # State tree has been constructed, we can now resolve state refs:
  262. for ref in refs_to_resolve:
  263. try:
  264. ref.resolve(root=root)
  265. except PathError as e:
  266. raise XmlErrorElement(t_el, "target=\"%s\": %s" % (transition.target_string, str(e))) from e
  267. # Next, visit tree to statically calculate many properties of states and transitions:
  268. statechart.tree = StateTree(root)
  269. return (state_child_rules(root, sibling_dict=children_dict), finish_root)
  270. def finish_statechart():
  271. return statechart
  272. if ext_file is None:
  273. return ([("semantics?", parse_semantics), ("datamodel?", parse_datamodel), ("inport*", get_port_parser(statechart.in_events)), ("outport*", get_port_parser(statechart.out_events)), ("root", parse_root)], finish_statechart)
  274. else:
  275. return ([("override_semantics?", parse_semantics)], finish_statechart)
  276. return parse_statechart