igraphstate.py 6.2 KB


  1. from typing import Any, Optional, List, Tuple
  2. import igraph as ig
  3. from state.base import State, Node, Edge, Element
  4. class iGraphState(State):
  5. """
  6. Use igraph as ModelverseState.
  7. """
  8. NODE_IS_EDGE_ATTR = "edge"
  9. NODE_LABEL_ATTR = "label"
  10. VALUE_ATTR = "value"
  11. def __init__(self):
  12. self.graph = ig.Graph(directed=True)
  13. self.mapper = {}
  14. self.root = self.create_node()
  15. def __repr__(self):
  16. return "iGraphState {%s}" % str(list(self.mapper.keys()))
  17. def create_node(self) -> Node:
  18. new_id = self.new_id()
  19. self.mapper[new_id] = self.graph.add_vertex()
  20. self.mapper[new_id][self.NODE_LABEL_ATTR] = str(new_id)
  21. self.mapper[new_id][self.NODE_IS_EDGE_ATTR] = False
  22. self.mapper[new_id][self.VALUE_ATTR] = None
  23. return new_id
  24. def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
  25. if source not in self.mapper:
  26. return None
  27. if target not in self.mapper:
  28. return None
  29. new_id = self.create_node()
  30. self.mapper[new_id][self.NODE_IS_EDGE_ATTR] = True
  31. self.mapper[new_id]["source"] = source
  32. self.mapper[new_id]["target"] = target
  33. self.graph.add_edge(self.mapper[source], self.mapper[new_id])
  34. self.graph.add_edge(self.mapper[new_id], self.mapper[target])
  35. return new_id
  36. def create_nodevalue(self, value: Any) -> Optional[Node]:
  37. if not self.is_valid_datavalue(value):
  38. return None
  39. nid = self.create_node()
  40. self.mapper[nid][self.VALUE_ATTR] = value
  41. return nid
  42. def create_dict(self, source: Element, value: Any, target: Element) -> None:
  43. eid = self.create_edge(source, target)
  44. if eid is not None:
  45. nid = self.create_nodevalue(value)
  46. if nid is not None:
  47. self.create_edge(eid, nid)
  48. def read_root(self) -> Node:
  49. return self.root
  50. def read_value(self, node: Node) -> Optional[Any]:
  51. if node not in self.mapper or self.mapper[node][self.NODE_IS_EDGE_ATTR]:
  52. return None
  53. return self.mapper[node][self.VALUE_ATTR]
  54. def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
  55. if elem not in self.mapper:
  56. return None
  57. neighs = self.mapper[elem].neighbors(mode="out")
  58. return [Edge(n[self.NODE_LABEL_ATTR]) for n in neighs if n[self.NODE_IS_EDGE_ATTR] and n["source"] == elem]
  59. def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
  60. if elem not in self.mapper:
  61. return None
  62. neighs = self.mapper[elem].neighbors(mode="in")
  63. return [Edge(n[self.NODE_LABEL_ATTR]) for n in neighs if n[self.NODE_IS_EDGE_ATTR] and n["target"] == elem]
  64. def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
  65. if edge in self.mapper and self.mapper[edge][self.NODE_IS_EDGE_ATTR]:
  66. e_ = self.mapper[edge]
  67. return e_["source"], e_["target"]
  68. return None, None
  69. def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
  70. edge = self.read_dict_edge(elem, value)
  71. if edge is None:
  72. return None
  73. return self.mapper[edge]["target"]
  74. def read_dict_keys(self, elem: Element) -> Optional[List[Element]]:
  75. outgoing = self.read_outgoing(elem)
  76. if outgoing is None:
  77. return None
  78. res = set()
  79. for edge in outgoing:
  80. labels = self.read_outgoing(edge)
  81. if labels is None: continue
  82. for label in labels:
  83. res.add(self.mapper[label]["target"])
  84. return list(res)
  85. def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
  86. outgoing = self.read_outgoing(elem)
  87. if outgoing is None:
  88. return None
  89. for edge in outgoing:
  90. labels = self.read_outgoing(edge)
  91. if labels is None: continue
  92. for label in labels:
  93. if self.read_value(self.mapper[label]["target"]) == value:
  94. return edge
  95. return None
  96. def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
  97. edge = self.read_dict_node_edge(elem, value_node)
  98. if edge is None:
  99. return None
  100. return self.read_edge(edge)[1]
  101. def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
  102. if value_node not in self.mapper or elem not in self.mapper:
  103. return None
  104. inc = self.read_incoming(value_node)
  105. out = self.read_outgoing(elem)
  106. if inc is None or out is None:
  107. return None
  108. for edge in out:
  109. inc2 = self.read_outgoing(edge)
  110. if inc2 is None: continue
  111. for i in inc2:
  112. if i in inc:
  113. return edge
  114. return None
  115. def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
  116. incoming = self.read_incoming(elem)
  117. if incoming is None:
  118. return None
  119. res = []
  120. for edge in incoming:
  121. labels = self.read_outgoing(edge)
  122. if labels is None: continue
  123. for label in labels:
  124. if self.read_value(self.mapper[label]["target"]) == value:
  125. res.append(self.mapper[edge]["source"])
  126. return res
  127. def delete_node(self, node: Node) -> None:
  128. if node in self.mapper:
  129. # Remove all the edges of this node
  130. edges = self.read_outgoing(node) + self.read_incoming(node)
  131. for o in edges:
  132. self.delete_edge(o)
  133. # delete the vertex
  134. self.graph.delete_vertices(self.mapper[node].index)
  135. # Reset the map, because otherwise weird stuff happens
  136. self._reset_map()
  137. def delete_edge(self, edge: Edge) -> None:
  138. if edge in self.mapper and self.mapper[edge][self.NODE_IS_EDGE_ATTR]:
  139. # Remove all the edges of this node
  140. edges = self.read_outgoing(edge) + self.read_incoming(edge)
  141. for o in edges:
  142. self.delete_node(o)
  143. # Reset the map, because otherwise weird stuff happens
  144. self._reset_map()
  145. # Delete the Edge
  146. if edge in self.mapper:
  147. self.graph.delete_vertices(self.mapper[edge].index)
  148. # Reset the map, because otherwise weird stuff happens
  149. self._reset_map()
  150. def _reset_map(self):
  151. """
  152. Removing vertexes will result in inconsistent references in the internal map.
  153. This function must be called after each removal.
  154. """
  155. self.mapper.clear()
  156. for vertex in self.graph.vs:
  157. self.mapper[Element(vertex[self.NODE_LABEL_ATTR])] = vertex
  158. for node in self.mapper.values():
  159. if node[self.NODE_IS_EDGE_ATTR]:
  160. if node["source"] not in self.mapper:
  161. node["source"] = None
  162. if node["target"] not in self.mapper:
  163. node["target"] = None
  164. if __name__ == '__main__':
  165. state = iGraphState()
  166. a = state.create_node()
  167. print("a", a)
  168. b = state.create_node()
  169. print("b", b)
  170. c = state.create_edge(a, b)
  171. print("c", c)
  172. d = state.create_edge(c, a)
  173. print("d", d)
  174. e = state.create_edge(c, b)
  175. print("e", e)
  176. f = state.create_edge(c, d)
  177. print("f", f)
  178. ig.plot(state.graph, target="test.png")