xml.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. from typing import *
  2. import re
  3. from lark.exceptions import *
  4. from sccd.statechart.static.types import *
  5. from sccd.statechart.static.statechart import *
  6. from sccd.statechart.static.tree import *
  7. from sccd.statechart.dynamic.builtin_scope import *
  8. from sccd.util.xml_parser import *
  9. from sccd.statechart.parser.text import *
  10. from sccd.statechart.static.in_state import InStateMacroExpansion
  11. class SkipFile(Exception):
  12. pass
  13. parse_f = functools.partial(parse, decorate_exceptions=(ModelStaticError,LarkError))
  14. def check_duration_type(type):
  15. if type != SCCDDuration:
  16. msg = "Expression is '%s' type. Expected 'Duration' type." % str(type)
  17. if type == SCCDInt:
  18. msg += "\n Hint: Did you forget a duration unit suffix? ('s', 'ms', ...)"
  19. raise XmlError(msg)
  20. # path: filesystem path for finding external statecharts
  21. def statechart_parser_rules(globals, path, load_external = True, parse_f = parse_f, text_parser=TextParser(globals)) -> Rules:
  22. import os
  23. def parse_statechart(el):
  24. ext_file = el.get("src")
  25. if ext_file is None:
  26. statechart = Statechart(
  27. semantics=SemanticConfiguration(),
  28. scope=Scope("statechart", parent=BuiltIn),
  29. datamodel=None,
  30. internal_events=Bitmap(),
  31. internally_raised_events=Bitmap(),
  32. inport_events={},
  33. event_outport={},
  34. tree=None,
  35. )
  36. else:
  37. if not load_external:
  38. raise SkipFile("Parser configured not to load statecharts from external files.")
  39. statechart = parse_f(os.path.join(path, ext_file), [("statechart", statechart_parser_rules(globals, path, load_external=False, parse_f=parse_f, text_parser=text_parser))])
  40. def parse_semantics(el):
  41. available_aspects = SemanticConfiguration.get_fields()
  42. for aspect_name, text in el.attrib.items():
  43. try:
  44. aspect_type = available_aspects[aspect_name]
  45. except KeyError:
  46. raise XmlError("invalid semantic aspect: '%s'" % aspect_name)
  47. result = text_parser.parse_semantic_choice(text)
  48. if result.data == "wildcard":
  49. semantic_choice = list(aspect_type) # all options
  50. elif result.data == "list":
  51. semantic_choice = [aspect_type[token.value.upper()] for token in result.children]
  52. if len(semantic_choice) == 1:
  53. semantic_choice = semantic_choice[0]
  54. setattr(statechart.semantics, aspect_name, semantic_choice)
  55. def parse_datamodel(el):
  56. body = text_parser.parse_stmt(el.text)
  57. body.init_stmt(statechart.scope)
  58. statechart.datamodel = body
  59. def parse_inport(el):
  60. port_name = require_attribute(el, "name")
  61. globals.inports.assign_id(port_name)
  62. def parse_event(el):
  63. event_name = require_attribute(el, "name")
  64. event_id = globals.events.assign_id(event_name)
  65. port_events = statechart.inport_events.setdefault(port_name, Bitmap())
  66. port_events |= bit(event_id)
  67. statechart.inport_events[port_name] = port_events
  68. statechart.internal_events |= bit(event_id)
  69. return [("event+", parse_event)]
  70. def parse_outport(el):
  71. port_name = require_attribute(el, "name")
  72. def parse_event(el):
  73. event_name = require_attribute(el, "name")
  74. statechart.event_outport[event_name] = port_name
  75. return [("event+", parse_event)]
  76. def parse_root(el):
  77. if el.get("id") is not None:
  78. raise XmlError("<root> state must not have 'id' attribute.")
  79. root = State("", parent=None)
  80. children_dict = {}
  81. transitions = [] # All of the statechart's transitions accumulate here, cause we still need to find their targets, which we can't do before the entire state tree has been built. We find their targets when encoutering the </root> closing tag.
  82. after_id = 0 # After triggers need unique IDs within the scope of the statechart model
  83. refs_to_resolve = [] # Transition targets and INSTATE arguments. Resolved after constructing state tree.
  84. def get_default_state(el, state, children_dict):
  85. have_initial = False
  86. def parse_attr_initial(initial):
  87. nonlocal default_state
  88. nonlocal have_initial
  89. default_state = None
  90. have_initial = True
  91. try:
  92. default_state = children_dict[initial]
  93. except KeyError as e:
  94. raise XmlError("Not a child.") from e
  95. if_attribute(el, "initial", parse_attr_initial)
  96. if not have_initial:
  97. if len(state.children) == 1:
  98. default_state = state.children[0]
  99. else:
  100. raise XmlError("More than 1 child state: must set 'initial' attribute.")
  101. return default_state
  102. def state_child_rules(parent, sibling_dict: Dict[str, State]):
  103. # A transition's guard expression and action statements can read the transition's event parameters, and also possibly the current state configuration. We therefore now wrap these into a function with a bunch of parameters for those values that we want to bring into scope.
  104. def wrap_transition_params(expr_or_stmt, trigger: Trigger):
  105. if isinstance(expr_or_stmt, Statement):
  106. # Transition's action code
  107. body = expr_or_stmt
  108. elif isinstance(expr_or_stmt, Expression):
  109. # Transition's guard
  110. body = ReturnStatement(expr=expr_or_stmt)
  111. else:
  112. raise Exception("Unexpected error in parser")
  113. # The joy of writing expressions in abstract syntax:
  114. wrapped = FunctionDeclaration(
  115. params_decl=
  116. # The param '@conf' (which, on purpose, is an illegal identifier in textual concrete syntax, to prevent naming collisions) will contain the statechart's configuration as a bitmap (SCCDInt). This parameter is currently only used in the expansion of the INSTATE-macro.
  117. [ParamDecl(name="@conf", formal_type=SCCDStateConfiguration(state=parent))]
  118. # Plus all the parameters of the enabling events of the transition's trigger:
  119. + [param for event in trigger.enabling for param in event.params_decl],
  120. body=body)
  121. return wrapped
  122. def actions_rules(scope, wrap_trigger: Trigger = EMPTY_TRIGGER):
  123. def parse_raise(el):
  124. params = []
  125. def parse_param(el):
  126. expr_text = require_attribute(el, "expr")
  127. expr = text_parser.parse_expr(expr_text)
  128. function = wrap_transition_params(expr, trigger=wrap_trigger)
  129. function.init_expr(scope)
  130. function.scope.name = "event_param"
  131. params.append(function)
  132. def finish_raise():
  133. event_name = require_attribute(el, "event")
  134. try:
  135. port = statechart.event_outport[event_name]
  136. except KeyError:
  137. # Legacy fallback: read port from attribute
  138. port = el.get("port")
  139. if port is None:
  140. # internal event
  141. event_id = globals.events.assign_id(event_name)
  142. statechart.internally_raised_events |= bit(event_id)
  143. return RaiseInternalEvent(event_id=event_id, name=event_name, params=params)
  144. else:
  145. # output event - no ID in global namespace
  146. statechart.event_outport[event_name] = port
  147. globals.outports.assign_id(port)
  148. return RaiseOutputEvent(name=event_name, params=params, outport=port)
  149. return ([("param*", parse_param)], finish_raise)
  150. def parse_code(el):
  151. def finish_code():
  152. block = text_parser.parse_stmt(el.text)
  153. function = wrap_transition_params(block, trigger=wrap_trigger)
  154. function.init_expr(scope)
  155. function.scope.name = "code"
  156. return Code(function)
  157. return ([], finish_code)
  158. return {"raise": parse_raise, "code": parse_code}
  159. def common(el, constructor):
  160. short_name = require_attribute(el, "id")
  161. match = re.match("[A-Za-z_][A-Za-z_0-9]*", short_name)
  162. if match is None or match[0] != short_name:
  163. raise XmlError("invalid id")
  164. state = constructor(short_name, parent)
  165. already_there = sibling_dict.setdefault(short_name, state)
  166. if already_there is not state:
  167. raise XmlError("Sibling state with the same id exists.")
  168. return state
  169. def common_nonpseudo(el, constructor):
  170. state = common(el, constructor)
  171. if el.get("stable", "") == "true":
  172. state.stable = True
  173. return state
  174. def parse_state(el):
  175. state = common_nonpseudo(el, State)
  176. children_dict = {}
  177. def finish_state():
  178. if len(state.children) > 0:
  179. state.type = OrState(state=state,
  180. default_state=get_default_state(el, state, children_dict))
  181. else:
  182. state.type = AndState(state=state)
  183. return (state_child_rules(parent=state, sibling_dict=children_dict), finish_state)
  184. def parse_parallel(el):
  185. state = common_nonpseudo(el, State)
  186. state.type = AndState(state=state)
  187. return state_child_rules(parent=state, sibling_dict={})
  188. def parse_history(el):
  189. history_type = el.get("type", "shallow")
  190. if history_type == "deep":
  191. state = common(el, DeepHistoryState)
  192. elif history_type == "shallow":
  193. state = common(el, ShallowHistoryState)
  194. else:
  195. raise XmlError("attribute 'type' must be \"shallow\" or \"deep\".")
  196. def parse_onentry(el):
  197. def finish_onentry(*actions):
  198. parent.enter = actions
  199. return (actions_rules(scope=statechart.scope), finish_onentry)
  200. def parse_onexit(el):
  201. def finish_onexit(*actions):
  202. parent.exit = actions
  203. return (actions_rules(scope=statechart.scope), finish_onexit)
  204. def parse_transition(el):
  205. def macro_in_state(params):
  206. if len(params) != 1:
  207. raise XmlError("Macro @in: Expected 1 parameter")
  208. ref= StateRef(source=parent, path=text_parser.parse_path(params[0].string))
  209. refs_to_resolve.append(ref)
  210. return InStateMacroExpansion(ref=ref)
  211. # INSTATE-macro allowed in transition's guard and actions
  212. text_parser.parser.options.transformer.set_macro("@in", macro_in_state)
  213. if parent is root:
  214. raise XmlError("Root cannot be source of a transition.")
  215. target_string = require_attribute(el, "target")
  216. try:
  217. path = text_parser.parse_path(target_string)
  218. except Exception as e:
  219. raise XmlErrorElement(t_el, "Parsing target '%s': %s" % (transition.target_string, str(e))) from e
  220. transition = Transition(source=parent, path=path)
  221. refs_to_resolve.append(transition)
  222. have_event_attr = False
  223. def parse_attr_event(event):
  224. nonlocal have_event_attr
  225. have_event_attr = True
  226. positive_events, negative_events = text_parser.parse_events_decl(event)
  227. # Optimization: sort events by ID
  228. # Allows us to save time later.
  229. positive_events.sort(key=lambda e: e.id)
  230. if not negative_events:
  231. transition.trigger = Trigger(positive_events)
  232. statechart.internal_events |= transition.trigger.enabling_bitmap
  233. else:
  234. transition.trigger = NegatedTrigger(positive_events, negative_events)
  235. statechart.internal_events |= transition.trigger.enabling_bitmap
  236. def parse_attr_after(after):
  237. nonlocal after_id
  238. if have_event_attr:
  239. raise XmlError("Cannot specify 'after' and 'event' at the same time.")
  240. after_expr = text_parser.parse_expr(after)
  241. after_type = after_expr.init_expr(statechart.scope)
  242. check_duration_type(after_type)
  243. # After-events should only be generated by the runtime.
  244. # By putting a '+' in the event name (which isn't an allowed character in the parser), we ensure that the user will never accidentally (careless) or purposefully (evil) generate a valid after-event.
  245. event_name = "+%d" % after_id
  246. transition.trigger = AfterTrigger(globals.events.assign_id(event_name), event_name, after_id, after_expr)
  247. statechart.internal_events |= transition.trigger.enabling_bitmap
  248. after_id += 1
  249. def parse_attr_cond(cond):
  250. # Transition's guard expression
  251. guard_expr = text_parser.parse_expr(cond)
  252. guard_function = wrap_transition_params(guard_expr, transition.trigger)
  253. guard_type = guard_function.init_expr(statechart.scope)
  254. guard_function.scope.name = "guard"
  255. if guard_type.return_type is not SCCDBool:
  256. raise XmlError("Guard should be an expression evaluating to 'bool'.")
  257. transition.guard = guard_function
  258. if_attribute(el, "event", parse_attr_event)
  259. if_attribute(el, "after", parse_attr_after)
  260. if_attribute(el, "cond", parse_attr_cond)
  261. def finish_transition(*actions):
  262. transition.actions = actions
  263. transitions.append((transition, el))
  264. parent.transitions.append(transition)
  265. # INSTATE-macro not allowed outside of transition's guard or actions
  266. text_parser.parser.options.transformer.unset_macro("@in")
  267. return (actions_rules(scope=statechart.scope, wrap_trigger=transition.trigger), finish_transition)
  268. return {"state": parse_state, "parallel": parse_parallel, "history": parse_history, "onentry": parse_onentry, "onexit": parse_onexit, "transition": parse_transition}
  269. def finish_root():
  270. root.type = OrState(state=root, default_state=get_default_state(el, root, children_dict))
  271. # State tree has been constructed, we can now resolve state refs:
  272. for ref in refs_to_resolve:
  273. try:
  274. ref.resolve(root=root)
  275. except PathError as e:
  276. raise XmlErrorElement(t_el, "target=\"%s\": %s" % (transition.target_string, str(e))) from e
  277. # Next, visit tree to statically calculate many properties of states and transitions:
  278. statechart.tree = StateTree(root)
  279. return (state_child_rules(root, sibling_dict=children_dict), finish_root)
  280. def finish_statechart():
  281. return statechart
  282. if ext_file is None:
  283. return ([("semantics?", parse_semantics), ("datamodel?", parse_datamodel), ("inport*", parse_inport), ("outport*", parse_outport), ("root", parse_root)], finish_statechart)
  284. else:
  285. return ([("override_semantics?", parse_semantics)], finish_statechart)
  286. return parse_statechart