|
|
@@ -5,52 +5,71 @@ from drawio2oml.cbd import trace_abstract_syntax as trace_as
|
|
|
from typing import List, Optional, Dict
|
|
|
|
|
|
# Incomplete - only the bare minimum we need to parse our spring-mass-damper example
|
|
|
-BLOCK_TYPES = {
|
|
|
- "ConstantBlock": {
|
|
|
- "inputs": [],
|
|
|
- "outputs": ["OUT1"],
|
|
|
- },
|
|
|
- "AdderBlock": {
|
|
|
- "inputs": ["IN1", "IN2", "IN3", "IN4"], # actual number of ports is variable
|
|
|
- "outputs": ["OUT1"],
|
|
|
- },
|
|
|
- "IntegratorBlock": {
|
|
|
- "inputs": ["IC", "IN1"],
|
|
|
- "outputs": ["OUT1"],
|
|
|
- },
|
|
|
- "ProductBlock": {
|
|
|
- "inputs": ["IN1", "IN2", "IN3", "IN4"], # actual number of ports is variable
|
|
|
- "outputs": ["OUT1"],
|
|
|
- },
|
|
|
-}
|
|
|
+BLOCK_TYPES = ["ConstantBlock", "AdderBlock", "IntegratorBlock", "ProductBlock"]
|
|
|
+
|
|
|
+PORT_TYPES = ["InputPort", "OutputPort"]
|
|
|
|
|
|
def parse(parent: dio_as.Cell) -> (cbd_as.Model, List[trace_as.TraceabilityLink]):
|
|
|
name_to_block = {}
|
|
|
trace_links = []
|
|
|
|
|
|
+ # Construct blocks
|
|
|
+ blocks = []
|
|
|
for cell in parent.children:
|
|
|
- if "class_name" in cell.properties:
|
|
|
- class_name = cell.properties["class_name"]
|
|
|
- if class_name in BLOCK_TYPES:
|
|
|
- block_type = BLOCK_TYPES[class_name]
|
|
|
- name = cell.properties["block_name"]
|
|
|
- if name == "":
|
|
|
- raise Exception("Encountered CBD block with empty name. Every block must have a (unique) name.")
|
|
|
- if name in name_to_block:
|
|
|
- raise Exception("Non-unique block name: " + name)
|
|
|
- if "numberOfInputs" in cell.properties:
|
|
|
- n_inputs = int(cell.properties["numberOfInputs"])
|
|
|
- in_ports = block_type["inputs"][0:n_inputs]
|
|
|
+ class_name = cell.properties.get("class_name", None)
|
|
|
+ if class_name in BLOCK_TYPES:
|
|
|
+ name = cell.properties["block_name"]
|
|
|
+ if name == "":
|
|
|
+ raise Exception("Encountered CBD block with empty name. Every block must have a (unique) name.")
|
|
|
+ if name in name_to_block:
|
|
|
+ raise Exception("Non-unique block name: " + name)
|
|
|
+ in_ports = []
|
|
|
+ out_ports = []
|
|
|
+ name_to_port = {}
|
|
|
+ for port_cell in cell.children:
|
|
|
+ port_class_name = port_cell.properties.get("class_name", None)
|
|
|
+ port_name = port_cell.properties.get("name")
|
|
|
+ if port_name in name_to_port:
|
|
|
+ raise Exception("Non-unique port name within the scope of its block: " + port_name)
|
|
|
+ if port_class_name == "InputPort":
|
|
|
+ port = cbd_as.InputPort(name=port_name, incoming=[])
|
|
|
+ in_ports.append(port)
|
|
|
+ elif port_class_name == "OutputPort":
|
|
|
+ port = cbd_as.OutputPort(name=port_name, outgoing=[])
|
|
|
+ out_ports.append(port)
|
|
|
else:
|
|
|
- in_ports = block_type["inputs"]
|
|
|
- out_ports = block_type["outputs"]
|
|
|
- block = cbd_as.Block(
|
|
|
- name=name,
|
|
|
- kind=class_name,
|
|
|
- in_ports={n : cbd_as.InputPort(name=n, incoming=[]) for n in in_ports},
|
|
|
- out_ports={n : cbd_as.OutputPort(name=n, outgoing=[]) for n in out_ports},
|
|
|
- )
|
|
|
- name_to_block[name] = block
|
|
|
- trace_links.append(trace_as.TraceabilityLink(dio=cell, cbd=block))
|
|
|
+ continue # not a port
|
|
|
+ name_to_port[port_name] = port
|
|
|
+ trace_links.append(trace_as.TraceabilityLink(dio=port_cell, cbd=port))
|
|
|
+ block = cbd_as.Block(
|
|
|
+ name=name,
|
|
|
+ kind=class_name,
|
|
|
+ in_ports=in_ports,
|
|
|
+ out_ports=out_ports,
|
|
|
+ )
|
|
|
+ name_to_block[name] = block
|
|
|
+ blocks.append(block)
|
|
|
+ trace_links.append(trace_as.TraceabilityLink(dio=cell, cbd=block))
|
|
|
+
|
|
|
+ # Construct connections
|
|
|
+ conns = []
|
|
|
+ for cell in parent.children:
|
|
|
+ if isinstance(cell, dio_as.Edge):
|
|
|
+ print("Edge")
|
|
|
+ src = [tl for tl in trace_links if tl.dio == cell.source]
|
|
|
+ tgt = [tl for tl in trace_links if tl.dio == cell.target]
|
|
|
+ if len(src) == 1 and len(tgt) == 1:
|
|
|
+ # we have ourselves an edge that connects two CBD elements
|
|
|
+ from_port = src[0].cbd
|
|
|
+ to_port = tgt[0].cbd
|
|
|
+ if not isinstance(from_port, cbd_as.OutputPort):
|
|
|
+ raise Exception("Edge should have OutputPort as source")
|
|
|
+ if not isinstance(to_port, cbd_as.InputPort):
|
|
|
+ raise Exception("Edge should have InputPort as target")
|
|
|
+ conn = cbd_as.Connection(from_port=from_port, to_port=to_port)
|
|
|
+ from_port.outgoing.append(conn)
|
|
|
+ to_port.incoming.append(conn)
|
|
|
+ conns.append(conn)
|
|
|
+ trace_links.append(trace_as.TraceabilityLink(dio=cell, cbd=conn))
|
|
|
|
|
|
- return (cbd_as.Model(blocks=[b for b in name_to_block.values()]), trace_links)
|
|
|
+ return (cbd_as.Model(blocks=blocks, conns=conns), trace_links)
|