| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- from typing import Any, Optional, List, Tuple
- import igraph as ig
- from state.base import State, Node, Edge, Element
- class iGraphState(State):
- """
- Use igraph as ModelverseState.
- """
- NODE_IS_EDGE_ATTR = "edge"
- NODE_LABEL_ATTR = "label"
- VALUE_ATTR = "value"
- def __init__(self):
- self.graph = ig.Graph(directed=True)
- self.mapper = {}
- self.root = self.create_node()
- def __repr__(self):
- return "iGraphState {%s}" % str(list(self.mapper.keys()))
- def create_node(self) -> Node:
- new_id = self.new_id()
- self.mapper[new_id] = self.graph.add_vertex()
- self.mapper[new_id][self.NODE_LABEL_ATTR] = str(new_id)
- self.mapper[new_id][self.NODE_IS_EDGE_ATTR] = False
- self.mapper[new_id][self.VALUE_ATTR] = None
- return new_id
- def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
- if source not in self.mapper:
- return None
- if target not in self.mapper:
- return None
- new_id = self.create_node()
- self.mapper[new_id][self.NODE_IS_EDGE_ATTR] = True
- self.mapper[new_id]["source"] = source
- self.mapper[new_id]["target"] = target
- self.graph.add_edge(self.mapper[source], self.mapper[new_id])
- self.graph.add_edge(self.mapper[new_id], self.mapper[target])
- return new_id
- def create_nodevalue(self, value: Any) -> Optional[Node]:
- if not self.is_valid_datavalue(value):
- return None
- nid = self.create_node()
- self.mapper[nid][self.VALUE_ATTR] = value
- return nid
- def create_dict(self, source: Element, value: Any, target: Element) -> None:
- eid = self.create_edge(source, target)
- if eid is not None:
- nid = self.create_nodevalue(value)
- if nid is not None:
- self.create_edge(eid, nid)
- def read_root(self) -> Node:
- return self.root
- def read_value(self, node: Node) -> Optional[Any]:
- if node not in self.mapper or self.mapper[node][self.NODE_IS_EDGE_ATTR]:
- return None
- return self.mapper[node][self.VALUE_ATTR]
- def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
- if elem not in self.mapper:
- return None
- neighs = self.mapper[elem].neighbors(mode="out")
- return [Edge(n[self.NODE_LABEL_ATTR]) for n in neighs if n[self.NODE_IS_EDGE_ATTR] and n["source"] == elem]
- def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
- if elem not in self.mapper:
- return None
- neighs = self.mapper[elem].neighbors(mode="in")
- return [Edge(n[self.NODE_LABEL_ATTR]) for n in neighs if n[self.NODE_IS_EDGE_ATTR] and n["target"] == elem]
- def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
- if edge in self.mapper and self.mapper[edge][self.NODE_IS_EDGE_ATTR]:
- e_ = self.mapper[edge]
- return e_["source"], e_["target"]
- return None, None
- def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
- edge = self.read_dict_edge(elem, value)
- if edge is None:
- return None
- return self.mapper[edge]["target"]
- def read_dict_keys(self, elem: Element) -> Optional[List[Element]]:
- outgoing = self.read_outgoing(elem)
- if outgoing is None:
- return None
- res = set()
- for edge in outgoing:
- labels = self.read_outgoing(edge)
- if labels is None: continue
- for label in labels:
- res.add(self.mapper[label]["target"])
- return list(res)
- def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
- outgoing = self.read_outgoing(elem)
- if outgoing is None:
- return None
- for edge in outgoing:
- labels = self.read_outgoing(edge)
- if labels is None: continue
- for label in labels:
- if self.read_value(self.mapper[label]["target"]) == value:
- return edge
- return None
- def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
- edge = self.read_dict_node_edge(elem, value_node)
- if edge is None:
- return None
- return self.read_edge(edge)[1]
- def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
- if value_node not in self.mapper or elem not in self.mapper:
- return None
- inc = self.read_incoming(value_node)
- out = self.read_outgoing(elem)
- if inc is None or out is None:
- return None
- for edge in out:
- inc2 = self.read_outgoing(edge)
- if inc2 is None: continue
- for i in inc2:
- if i in inc:
- return edge
- return None
- def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
- incoming = self.read_incoming(elem)
- if incoming is None:
- return None
- res = []
- for edge in incoming:
- labels = self.read_outgoing(edge)
- if labels is None: continue
- for label in labels:
- if self.read_value(self.mapper[label]["target"]) == value:
- res.append(self.mapper[edge]["source"])
- return res
- def delete_node(self, node: Node) -> None:
- if node in self.mapper:
- # Remove all the edges of this node
- edges = self.read_outgoing(node) + self.read_incoming(node)
- for o in edges:
- self.delete_edge(o)
- # delete the vertex
- self.graph.delete_vertices(self.mapper[node].index)
- # Reset the map, because otherwise weird stuff happens
- self._reset_map()
- def delete_edge(self, edge: Edge) -> None:
- if edge in self.mapper and self.mapper[edge][self.NODE_IS_EDGE_ATTR]:
- # Remove all the edges of this node
- edges = self.read_outgoing(edge) + self.read_incoming(edge)
- for o in edges:
- self.delete_node(o)
- # Reset the map, because otherwise weird stuff happens
- self._reset_map()
- # Delete the Edge
- if edge in self.mapper:
- self.graph.delete_vertices(self.mapper[edge].index)
- # Reset the map, because otherwise weird stuff happens
- self._reset_map()
- def _reset_map(self):
- """
- Removing vertexes will result in inconsistent references in the internal map.
- This function must be called after each removal.
- """
- self.mapper.clear()
- for vertex in self.graph.vs:
- self.mapper[Element(vertex[self.NODE_LABEL_ATTR])] = vertex
- for node in self.mapper.values():
- if node[self.NODE_IS_EDGE_ATTR]:
- if node["source"] not in self.mapper:
- node["source"] = None
- if node["target"] not in self.mapper:
- node["target"] = None
- if __name__ == '__main__':
- state = iGraphState()
- a = state.create_node()
- print("a", a)
- b = state.create_node()
- print("b", b)
- c = state.create_edge(a, b)
- print("c", c)
- d = state.create_edge(c, a)
- print("d", d)
- e = state.create_edge(c, b)
- print("e", e)
- f = state.create_edge(c, d)
- print("f", f)
- ig.plot(state.graph, target="test.png")
|