xml.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. from typing import *
  2. import re
  3. from lark.exceptions import *
  4. from sccd.statechart.static.types import *
  5. from sccd.statechart.static.statechart import *
  6. from sccd.statechart.static.tree import *
  7. from sccd.statechart.dynamic.builtin_scope import *
  8. from sccd.util.xml_parser import *
  9. from sccd.statechart.parser.text import *
  10. from sccd.statechart.static.in_state import InStateMacroExpansion
  11. class SkipFile(Exception):
  12. pass
  13. parse_f = functools.partial(parse, decorate_exceptions=(ModelStaticError,LarkError))
  14. def check_duration_type(type):
  15. if type != SCCDDuration:
  16. msg = "Expression is '%s' type. Expected 'Duration' type." % str(type)
  17. if type == SCCDInt:
  18. msg += "\n Hint: Did you forget a duration unit suffix? ('s', 'ms', ...)"
  19. raise XmlError(msg)
  20. # path: filesystem path for finding external statecharts
  21. def statechart_parser_rules(globals, path, load_external = True, parse_f = parse_f, text_parser=TextParser(globals)) -> Rules:
  22. import os
  23. def parse_statechart(el):
  24. ext_file = el.get("src")
  25. if ext_file is None:
  26. statechart = Statechart(
  27. semantics=SemanticConfiguration(),
  28. scope=Scope("statechart", parent=BuiltIn),
  29. datamodel=None,
  30. internal_events=Bitmap(),
  31. internally_raised_events=Bitmap(),
  32. inport_events={},
  33. event_outport={},
  34. tree=None,
  35. )
  36. else:
  37. if not load_external:
  38. raise SkipFile("Parser configured not to load statecharts from external files.")
  39. statechart = parse_f(os.path.join(path, ext_file), [("statechart", statechart_parser_rules(globals, path, load_external=False, parse_f=parse_f, text_parser=text_parser))])
  40. def parse_semantics(el):
  41. available_aspects = SemanticConfiguration.get_fields()
  42. for aspect_name, text in el.attrib.items():
  43. try:
  44. aspect_type = available_aspects[aspect_name]
  45. except KeyError:
  46. raise XmlError("invalid semantic aspect: '%s'" % aspect_name)
  47. result = text_parser.parse_semantic_choice(text)
  48. if result.data == "wildcard":
  49. semantic_choice = list(aspect_type) # all options
  50. elif result.data == "list":
  51. semantic_choice = [aspect_type[token.value.upper()] for token in result.children]
  52. if len(semantic_choice) == 1:
  53. semantic_choice = semantic_choice[0]
  54. setattr(statechart.semantics, aspect_name, semantic_choice)
  55. def parse_datamodel(el):
  56. body = text_parser.parse_stmt(el.text)
  57. body.init_stmt(statechart.scope)
  58. statechart.datamodel = body
  59. def parse_inport(el):
  60. port_name = require_attribute(el, "name")
  61. globals.inports.assign_id(port_name)
  62. def parse_event(el):
  63. event_name = require_attribute(el, "name")
  64. event_id = globals.events.assign_id(event_name)
  65. port_events = statechart.inport_events.setdefault(port_name, Bitmap())
  66. port_events |= bit(event_id)
  67. statechart.inport_events[port_name] = port_events
  68. statechart.internal_events |= bit(event_id)
  69. return [("event+", parse_event)]
  70. def parse_outport(el):
  71. port_name = require_attribute(el, "name")
  72. def parse_event(el):
  73. event_name = require_attribute(el, "name")
  74. statechart.event_outport[event_name] = port_name
  75. return [("event+", parse_event)]
  76. def parse_root(el):
  77. if el.get("id") is not None:
  78. raise XmlError("<root> state must not have 'id' attribute.")
  79. root = State("", parent=None)
  80. children_dict = {}
  81. transitions = [] # All of the statechart's transitions accumulate here, cause we still need to find their targets, which we can't do before the entire state tree has been built. We find their targets when encoutering the </root> closing tag.
  82. after_id = 0 # After triggers need unique IDs within the scope of the statechart model
  83. refs_to_resolve = [] # Transition targets and INSTATE arguments. Resolved after constructing state tree.
  84. def get_default_state(el, state, children_dict):
  85. have_initial = False
  86. def parse_attr_initial(initial):
  87. nonlocal default_state
  88. nonlocal have_initial
  89. default_state = None
  90. have_initial = True
  91. try:
  92. default_state = children_dict[initial]
  93. except KeyError as e:
  94. raise XmlError("Not a child.") from e
  95. if_attribute(el, "initial", parse_attr_initial)
  96. if not have_initial:
  97. if len(state.children) == 1:
  98. default_state = state.children[0]
  99. else:
  100. raise XmlError("More than 1 child state: must set 'initial' attribute.")
  101. return default_state
  102. def state_child_rules(parent, sibling_dict: Dict[str, State]):
  103. def macro_in_state(params):
  104. if len(params) != 1:
  105. raise XmlError("Macro @in: Expected 1 parameter")
  106. ref= StateRef(source=parent, path=text_parser.parse_path(params[0].string))
  107. refs_to_resolve.append(ref)
  108. return InStateMacroExpansion(ref=ref)
  109. text_parser.parser.options.transformer.set_macro("@in", macro_in_state)
  110. # A transition's guard expression and action statements can read the transition's event parameters, and also possibly the current state configuration. We therefore now wrap these into a function with a bunch of parameters for those values that we want to bring into scope.
  111. def wrap_transition_params(expr_or_stmt, trigger: Trigger):
  112. if isinstance(expr_or_stmt, Statement):
  113. # Transition's action code
  114. body = expr_or_stmt
  115. elif isinstance(expr_or_stmt, Expression):
  116. # Transition's guard
  117. body = ReturnStatement(expr=expr_or_stmt)
  118. else:
  119. raise Exception("Unexpected error in parser")
  120. # The joy of writing expressions in abstract syntax:
  121. wrapped = FunctionDeclaration(
  122. params_decl=
  123. # The param '@conf' (which, on purpose, is an illegal identifier in textual concrete syntax, to prevent naming collisions) will contain the statechart's configuration as a bitmap (SCCDInt). This parameter is currently only used in the expansion of the INSTATE-macro.
  124. [ParamDecl(name="@conf", formal_type=SCCDStateConfiguration(state=parent))]
  125. # Plus all the parameters of the enabling events of the transition's trigger:
  126. + [param for event in trigger.enabling for param in event.params_decl],
  127. body=body)
  128. return wrapped
  129. def actions_rules(scope, wrap_trigger: Trigger = EMPTY_TRIGGER):
  130. def parse_raise(el):
  131. params = []
  132. def parse_param(el):
  133. expr_text = require_attribute(el, "expr")
  134. expr = text_parser.parse_expr(expr_text)
  135. function = wrap_transition_params(expr, trigger=wrap_trigger)
  136. function.init_expr(scope)
  137. function.scope.name = "event_param"
  138. params.append(function)
  139. def finish_raise():
  140. event_name = require_attribute(el, "event")
  141. try:
  142. port = statechart.event_outport[event_name]
  143. except KeyError:
  144. # Legacy fallback: read port from attribute
  145. port = el.get("port")
  146. if port is None:
  147. # internal event
  148. event_id = globals.events.assign_id(event_name)
  149. statechart.internally_raised_events |= bit(event_id)
  150. return RaiseInternalEvent(event_id=event_id, name=event_name, params=params)
  151. else:
  152. # output event - no ID in global namespace
  153. statechart.event_outport[event_name] = port
  154. globals.outports.assign_id(port)
  155. return RaiseOutputEvent(name=event_name, params=params, outport=port)
  156. return ([("param*", parse_param)], finish_raise)
  157. def parse_code(el):
  158. def finish_code():
  159. block = text_parser.parse_stmt(el.text)
  160. function = wrap_transition_params(block, trigger=wrap_trigger)
  161. function.init_expr(scope)
  162. function.scope.name = "code"
  163. return Code(function)
  164. return ([], finish_code)
  165. return {"raise": parse_raise, "code": parse_code}
  166. def common(el, constructor):
  167. short_name = require_attribute(el, "id")
  168. match = re.match("[A-Za-z_][A-Za-z_0-9]*", short_name)
  169. if match is None or match[0] != short_name:
  170. raise XmlError("invalid id")
  171. state = constructor(short_name, parent)
  172. already_there = sibling_dict.setdefault(short_name, state)
  173. if already_there is not state:
  174. raise XmlError("Sibling state with the same id exists.")
  175. return state
  176. def common_nonpseudo(el, constructor):
  177. state = common(el, constructor)
  178. if el.get("stable", "") == "true":
  179. state.stable = True
  180. return state
  181. def parse_state(el):
  182. state = common_nonpseudo(el, State)
  183. children_dict = {}
  184. def finish_state():
  185. if len(state.children) > 0:
  186. state.type = OrState(state=state,
  187. default_state=get_default_state(el, state, children_dict))
  188. else:
  189. state.type = AndState(state=state)
  190. return (state_child_rules(parent=state, sibling_dict=children_dict), finish_state)
  191. def parse_parallel(el):
  192. state = common_nonpseudo(el, State)
  193. state.type = AndState(state=state)
  194. return state_child_rules(parent=state, sibling_dict={})
  195. def parse_history(el):
  196. history_type = el.get("type", "shallow")
  197. if history_type == "deep":
  198. state = common(el, DeepHistoryState)
  199. elif history_type == "shallow":
  200. state = common(el, ShallowHistoryState)
  201. else:
  202. raise XmlError("attribute 'type' must be \"shallow\" or \"deep\".")
  203. def parse_onentry(el):
  204. def finish_onentry(*actions):
  205. parent.enter = actions
  206. return (actions_rules(scope=statechart.scope), finish_onentry)
  207. def parse_onexit(el):
  208. def finish_onexit(*actions):
  209. parent.exit = actions
  210. return (actions_rules(scope=statechart.scope), finish_onexit)
  211. def parse_transition(el):
  212. if parent is root:
  213. raise XmlError("Root cannot be source of a transition.")
  214. target_string = require_attribute(el, "target")
  215. try:
  216. path = text_parser.parse_path(target_string)
  217. except Exception as e:
  218. raise XmlErrorElement(t_el, "Parsing target '%s': %s" % (transition.target_string, str(e))) from e
  219. transition = Transition(source=parent, path=path)
  220. refs_to_resolve.append(transition)
  221. have_event_attr = False
  222. def parse_attr_event(event):
  223. nonlocal have_event_attr
  224. have_event_attr = True
  225. positive_events, negative_events = text_parser.parse_events_decl(event)
  226. # Optimization: sort events by ID
  227. # Allows us to save time later.
  228. positive_events.sort(key=lambda e: e.id)
  229. if not negative_events:
  230. transition.trigger = Trigger(positive_events)
  231. statechart.internal_events |= transition.trigger.enabling_bitmap
  232. else:
  233. transition.trigger = NegatedTrigger(positive_events, negative_events)
  234. statechart.internal_events |= transition.trigger.enabling_bitmap
  235. def parse_attr_after(after):
  236. nonlocal after_id
  237. if have_event_attr:
  238. raise XmlError("Cannot specify 'after' and 'event' at the same time.")
  239. after_expr = text_parser.parse_expr(after)
  240. after_type = after_expr.init_expr(statechart.scope)
  241. check_duration_type(after_type)
  242. # After-events should only be generated by the runtime.
  243. # By putting a '+' in the event name (which isn't an allowed character in the parser), we ensure that the user will never accidentally (careless) or purposefully (evil) generate a valid after-event.
  244. event_name = "+%d" % after_id
  245. transition.trigger = AfterTrigger(globals.events.assign_id(event_name), event_name, after_id, after_expr)
  246. statechart.internal_events |= transition.trigger.enabling_bitmap
  247. after_id += 1
  248. def parse_attr_cond(cond):
  249. # Transition's guard expression
  250. guard_expr = text_parser.parse_expr(cond)
  251. guard_function = wrap_transition_params(guard_expr, transition.trigger)
  252. guard_type = guard_function.init_expr(statechart.scope)
  253. guard_function.scope.name = "guard"
  254. if guard_type.return_type is not SCCDBool:
  255. raise XmlError("Guard should be an expression evaluating to 'bool'.")
  256. transition.guard = guard_function
  257. if_attribute(el, "event", parse_attr_event)
  258. if_attribute(el, "after", parse_attr_after)
  259. if_attribute(el, "cond", parse_attr_cond)
  260. def finish_transition(*actions):
  261. transition.actions = actions
  262. transitions.append((transition, el))
  263. parent.transitions.append(transition)
  264. return (actions_rules(scope=statechart.scope, wrap_trigger=transition.trigger), finish_transition)
  265. def finish_state_child_rules():
  266. text_parser.parser.options.transformer.unset_macro("@in")
  267. return ({"state": parse_state, "parallel": parse_parallel, "history": parse_history, "onentry": parse_onentry, "onexit": parse_onexit, "transition": parse_transition}, finish_state_child_rules)
  268. def finish_root():
  269. root.type = OrState(state=root, default_state=get_default_state(el, root, children_dict))
  270. # State tree has been constructed, we can now resolve state refs:
  271. for ref in refs_to_resolve:
  272. try:
  273. ref.resolve(root=root)
  274. except PathError as e:
  275. raise XmlErrorElement(t_el, "target=\"%s\": %s" % (transition.target_string, str(e))) from e
  276. # Next, visit tree to statically calculate many properties of states and transitions:
  277. statechart.tree = StateTree(root)
  278. return (state_child_rules(root, sibling_dict=children_dict), finish_root)
  279. def finish_statechart():
  280. return statechart
  281. if ext_file is None:
  282. return ([("semantics?", parse_semantics), ("datamodel?", parse_datamodel), ("inport*", parse_inport), ("outport*", parse_outport), ("root", parse_root)], finish_statechart)
  283. else:
  284. return ([("override_semantics?", parse_semantics)], finish_statechart)
  285. return parse_statechart