Source code for parser

"""Test"""
import xml.etree.ElementTree as ET
from urllib.parse import unquote
import base64
import zlib
import re

IGNORE = ['id', 'label', 'placeholders', 'class_name']
"""Properties to ignore when parsing."""

[docs]class Node: """Identifies a node object, as found in drawio. On one hand used for the individual components, but on the other hand used as a "group" of multiple nodes. """ def __init__(self, id, class_name, properties): self.id = id self.class_name = class_name self.properties = properties self._connections = {} self._inputs = set() self._outputs = set() self.children = [] def __contains__(self, item): return item in self._inputs or item in self._outputs def __getitem__(self, item): return self.properties[item]
[docs] def add_input(self, name): self._inputs.add(name)
[docs] def add_output(self, name): self._outputs.add(name)
[docs] def get_inputs(self): return list(self._inputs)
[docs] def get_outputs(self): return list(self._outputs)
[docs] def get_connections(self): return self._connections
[docs] def add_connection(self, source, target): if source in self._connections: self._connections[source].append(target) else: self._connections[source] = [target]
[docs] def get_properties_string(self, ignore=[]): res = "" for s in [f"{k}=({v if len(v) > 0 else 'None'})" for k, v in self.properties.items() if k not in IGNORE + ignore]: res += ", " + s return res
[docs] def is_empty(self): return len(self.children) == 0
[docs]class Page: """A single page in drawio. Contains multiple nodes.""" def __init__(self, name): self.name = name self.__nodes = [] self.__imports = []
[docs] def add_import(self, im, obj=None): self.__imports.append((im, obj))
[docs] def add_node(self, node): if node is not None: self.__nodes.append(node)
[docs] def get_imports(self): return self.__imports
[docs] def get_nodes(self): return self.__nodes
[docs] def get_sanitized_name(self): if re.match(r"Page-\d+", self.name): return self.name[len("Page-"):] return re.sub(r"[^a-zA-Z0-9_]", "", self.name)
[docs]class Parser: """The drawio parser.""" def __init__(self, filename, setup, ignore_empty_nodes=False): self.filename = filename self.input_class = setup["input class"] self.output_class = setup["output class"] self.class_object_path = setup["class object xpath"] self.special_object_path = setup["special object xpath"] self.verify = setup["verify"] self.ignore_empty_nodes = ignore_empty_nodes self.pages = [] self.__class_names = {}
[docs] @staticmethod def decode_and_deflate(data): """Draw.io compresses each page as follows: First, all data is url-encoded Next, it is compressed/deflated Finally, it is encoded according to base64. To obtain the page data, we have to do the reverse. Returns: Uncompressed and decoded data as a string. """ decoded_data = base64.b64decode(data) inflated = zlib.decompress(decoded_data, -15).decode('utf-8') url_decoded_data = unquote(inflated) return ET.fromstring(url_decoded_data)
[docs] def parse_page(self, page, nroot): """Parses a single page of the model.""" objects = nroot.findall(self.class_object_path) for obj in objects: page.add_node(self.create_node(nroot, obj.attrib, page)) special = nroot.findall(self.special_object_path) for obj in special: if obj.attrib["role"] == "import": module = obj.attrib["module"] if "objects" in obj.attrib: objects = obj.attrib["objects"] page.add_import(module, objects) else: page.add_import(module) self.pages.append(page)
[docs] def parse(self): """Does the actual file parsing. If the file is compressed, we uncompress and work from there. If it wasn't compressed, we can work with the whole tree. Returns: A list of Node objects, representing the drawio file. """ tree = ET.parse(self.filename) root = tree.getroot() compressed = len(root.findall(".//mxGraphModel")) == 0 pages = root.findall(".//diagram") for page in pages: page_obj = Page(page.attrib["name"]) if compressed: nroot = self.decode_and_deflate(page.text) else: nroot = page self.parse_page(page_obj, nroot) self.verify(self.pages)
[docs] def create_node(self, root, attr, page): class_name = attr["class_name"] # detect duplicate class names if class_name in self.__class_names: raise ParseException(f"In page {page.name}: duplicate definition of class '{class_name}'. " f"First defined in page {self.__class_names[class_name].name}.") # detect spaces in class names if re.search(r"\s", class_name) is not None: raise ParseException(f"In page {page.name}: invalid class '{class_name}'. Class names may not contain spaces.") node = Node(attr["id"], class_name, attr) self.__class_names[class_name] = page # Find the children of the node _rect = root.findall(".//*[@parent='%s']" % node.id)[1] components = root.findall(".//object/mxCell[@parent='%s']/.." % _rect.attrib["id"]) lookup = {} for com in components: att = com.attrib if att["class_name"] in [self.input_class, self.output_class]: # Create the ports name = att["name"] # Duplicate ports are allowed for clarity in the model. # They map onto the same port! if att["class_name"] == self.input_class: node.add_input(name) else: node.add_output(name) else: # Normal Node child = Node(att["id"], att["class_name"], att) lookup[child.id] = child node.children.append(child) if self.ignore_empty_nodes and node.is_empty(): return None edges = root.findall(".//*[@parent='%s'][@edge='1']" % _rect.attrib["id"]) for edge in edges: att = edge.attrib source = root.find(".//*[@id='%s']" % att["source"]) target = root.find(".//*[@id='%s']" % att["target"]) # TODO: check for valid connection! if source.attrib["class_name"] == self.input_class: sblock = source.attrib["name"] spn = "" else: sblock = lookup[source[0].attrib["parent"]] spn = source.attrib["name"] if target.attrib["class_name"] == self.output_class: tblock = target.attrib["name"] tpn = "" else: tblock = lookup[target[0].attrib["parent"]] tpn = target.attrib["name"] # TODO: also allow attributes on edges? node.add_connection((sblock, spn), (tblock, tpn)) return node
[docs]class ParseException(Exception): """Semantic exceptions when parsing.""" def __init__(self, message): super().__init__(message)
[docs]def parse_environment(vars): """Parses the set of environment variables, given with the :code:`-E`/:code:`--environment` variable.""" if vars is None: return {} sets = vars.split(",") return {k.strip(): v.strip() for k, v in [x.split("=") for x in sets]}