xml_parser.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. from enum import *
  2. from typing import *
  3. from lxml import etree
  4. import termcolor
  5. from sccd.util.debug import *
  6. # Raising this type of error during parsing will cause the error to be "decorated" with the current element being parsed highlighted in a displayed fragment of the XML source file
  7. class XmlError(Exception):
  8. pass
  9. # Similar to XmlError, but with a specified element instead of the current element.
  10. class XmlErrorElement(Exception):
  11. def __init__(self, el: etree.Element, msg):
  12. super().__init__(msg)
  13. self.el = el
  14. # Returns multiline string containing fragment of src_file with 'el' XML element highlighted.
  15. def xml_fragment(src_file: str, el: etree.Element) -> str:
  16. # This is really dirty, but can't find a clean way to do this with lxml.
  17. parent = el.getparent()
  18. if parent is None:
  19. parent = el
  20. with open(src_file, 'r') as file:
  21. lines = file.read().split('\n')
  22. numbered_lines = list(enumerate(lines, 1))
  23. parent_lines = etree.tostring(parent).decode('utf-8').strip().split('\n')
  24. el_lines = etree.tostring(el).decode('utf-8').strip().split('\n')
  25. text = []
  26. parent_firstline = parent.sourceline
  27. parent_lastline = parent.sourceline + len(parent_lines) - 1
  28. el_firstline = el.sourceline
  29. el_lastline = el.sourceline + len(el_lines) - 1
  30. from_line = max(parent_firstline, el_firstline - 4)
  31. to_line = min(parent_lastline, el_lastline + 4)
  32. def f(tup):
  33. return from_line <= tup[0] <= to_line
  34. for linenumber, line in filter(f, numbered_lines):
  35. ll = "%4d: %s" % (linenumber, line)
  36. if el_firstline <= linenumber <= el_lastline:
  37. ll = termcolor.colored(ll, 'yellow')
  38. text.append(ll)
  39. return "\n\n%s\n\n%s:\nline %d: <%s>: " % ('\n'.join(text), src_file,el.sourceline, el.tag)
  40. ParseElementF = Callable[[etree.Element], Optional['RulesWDone']]
  41. OrderedElements = List[Tuple[str, ParseElementF]]
  42. UnorderedElements = Dict[str, ParseElementF]
  43. Rules = Union[OrderedElements, UnorderedElements]
  44. RulesWDone = Union[Rules, Tuple[Rules,Callable]]
  45. # TODO: Refactor for readability :)
  46. # -> Introduce some actual domain-specific types instead of using lists, dicts and tuples
  47. # A very beefy parsing function on top of 'lxml' event-driven parsing, that takes parsing rules in a very powerful, schema-like format.
  48. # The 'rules' passed should be one of:
  49. # 1) A dictionary of XML tags mapped to a visit-calback, to denote that any of the tags in are allowed in any order and in any multiplicity.
  50. # 2) A list of tuples (pairs): (tag, visit-callback), to denote that the tags MUST occur in the given order. Additionally, each tag may have a multiplicity-suffix: '*' for any, '+' for at least once, '?' for optional, and no suffix for once.
  51. # A visit callback will be called with a single 'etree.XMLElement' argument, when an opening tag of an XML element is encountered and matches with a rule in the current 'rules' object.
  52. # Every visit callback MAY return a new set of 'rules' (= a dict or list) that will be used for the children elements of the element visited. If nothing is returned, the element is not allowed to have any children.
  53. # Finally, a 'rules' object may also be a tuple (rules, when_done), where 'rules' is a dict or list as described above, and 'when_done' is an additional callback, called when the closing tag of the element is encountered (after all children have been visited). From this callback, any value may be returned. The values returned by the 'when_done'-callbacks of the children of an element will be passed as arguments to the 'when_done' of this element.
  54. # The parse function itself returns the value returned by the parser rule of the 'when_done' of the document's root element.
  55. def parse(src_file, rules: RulesWDone, ignore_unmatched = False, decorate_exceptions = ()):
  56. class Multiplicity(Flag):
  57. AT_LEAST_ONCE = auto()
  58. AT_MOST_ONCE = auto()
  59. ANY = 0
  60. ONCE = AT_LEAST_ONCE | AT_MOST_ONCE
  61. OPTIONAL = AT_MOST_ONCE
  62. MULTIPLE = AT_LEAST_ONCE
  63. @staticmethod
  64. def parse_suffix(tag: str) -> Tuple[str, 'Multiplicity']:
  65. if tag.endswith("*"):
  66. m = Multiplicity.ANY
  67. tag = tag[:-1]
  68. elif tag.endswith("?"):
  69. m = Multiplicity.OPTIONAL
  70. tag = tag[:-1]
  71. elif tag.endswith("+"):
  72. m = Multiplicity.MULTIPLE
  73. tag = tag[:-1]
  74. else:
  75. m = Multiplicity.ONCE
  76. return tag, m
  77. def unparse_suffix(self, tag: str) -> str:
  78. return tag + {
  79. Multiplicity.ANY: "*",
  80. Multiplicity.ONCE: "",
  81. Multiplicity.OPTIONAL: "?",
  82. Multiplicity.MULTIPLE: "+"
  83. }[self]
  84. rules_stack = [rules]
  85. results_stack = [[]]
  86. def unpack_tuple(rules):
  87. when_done = []
  88. while isinstance(rules, tuple):
  89. assert len(rules) == 2
  90. when_done.append(rules[1])
  91. rules = rules[0]
  92. return (rules, when_done)
  93. def pack_tuple(rules, when_done):
  94. for cb in reversed(when_done):
  95. rules = (rules, cb)
  96. return rules
  97. for event, el in etree.iterparse(src_file, events=("start", "end")):
  98. try:
  99. rules, when_done = unpack_tuple(rules_stack[-1])
  100. if event == "start":
  101. # print("start", el.tag)
  102. parse_function = None
  103. if isinstance(rules, dict):
  104. # print("rules:", list(rules.keys()))
  105. try:
  106. parse_function = rules[el.tag]
  107. except KeyError as e:
  108. pass
  109. elif isinstance(rules, list):
  110. # print("rules:", [rule[0] for rule in rules])
  111. # Expecting elements in certain order and with certain multiplicities
  112. skipped_tags = []
  113. while len(rules) > 0:
  114. tag_w_suffix, func = rules[0]
  115. tag, m = Multiplicity.parse_suffix(tag_w_suffix)
  116. if tag == el.tag:
  117. # Match!
  118. if m & Multiplicity.AT_MOST_ONCE:
  119. # We don't allow this element next time
  120. rules = rules[1:]
  121. rules_stack[-1] = pack_tuple(rules, when_done)
  122. elif m & Multiplicity.AT_LEAST_ONCE:
  123. # We don't require this element next time
  124. m &= ~Multiplicity.AT_LEAST_ONCE
  125. rules = list(rules) # copy list before editing
  126. rules[0] = (m.unparse_suffix(tag), func) # edit rule
  127. rules_stack[-1] = pack_tuple(rules, when_done)
  128. parse_function = func
  129. break
  130. else:
  131. skipped_tags.append(tag)
  132. if m & Multiplicity.AT_LEAST_ONCE:
  133. raise XmlError("Unexpected element. Expected one of: %s" % ", ".join("<%s>" % t for t in skipped_tags))
  134. else:
  135. # Element is skipable
  136. rules = rules[1:]
  137. rules_stack[-1] = pack_tuple(rules, when_done)
  138. else:
  139. print(rules)
  140. assert False # rule should always be a dict or list
  141. if parse_function:
  142. children_rules = parse_function(el)
  143. if children_rules:
  144. rules_stack.append(children_rules)
  145. else:
  146. rules_stack.append([])
  147. else:
  148. if not ignore_unmatched:
  149. raise XmlError("Unexpected element.")
  150. else:
  151. rules_stack.append([])
  152. results_stack.append([])
  153. elif event == "end":
  154. if isinstance(rules, list) and len(rules) > 1:
  155. missing_required = []
  156. for rule in rules:
  157. tag_w_suffix, func = rule
  158. tag, m = Multiplicity.parse_suffix(tag_w_suffix)
  159. if m & Multiplicity.AT_LEAST_ONCE:
  160. missing_required.append(tag)
  161. if missing_required:
  162. raise XmlError("Missing required elements: %s " % ", ".join("<%s>" % t for t in missing_required))
  163. children_results = results_stack.pop()
  164. pair = rules_stack.pop()
  165. for cb in when_done:
  166. result = cb(*children_results)
  167. # print("end", el.tag, "with result=", result)
  168. if result:
  169. results_stack[-1].append(result)
  170. except (XmlError, *decorate_exceptions) as e:
  171. # Assume exception occured while visiting current element 'el':
  172. # Re-write exception message:
  173. e.args = (xml_fragment(src_file, el) + str(e),)
  174. raise
  175. except XmlErrorElement as e:
  176. # Element where exception occured is part of exception object:
  177. # Re-write exception message:
  178. e.args = (xml_fragment(src_file, e.el) + str(e),)
  179. raise
  180. results = results_stack[0] # sole stack frame remaining
  181. if len(results) > 0:
  182. return results[0] # return first item, since we expect at most one item since an XML file has only one root node
  183. # Utility functions to do more with less lines of code:
  184. def require_attribute(el, attr):
  185. val = el.get(attr)
  186. if val is None:
  187. raise XmlErrorElement(el, "missing required attribute '%s'" % attr)
  188. return val
  189. def if_attribute(el, attr, callback):
  190. val = el.get(attr)
  191. if val is not None:
  192. try:
  193. callback(val)
  194. except Exception as e:
  195. raise XmlErrorElement(el, "attribute %s=\"%s\": %s" % (attr, val, str(e))) from e