V0.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from uuid import UUID
  2. from state.base import State
  3. from typing import Any, List
  4. class Bottom:
  5. def __init__(self, state: State):
  6. self.state = state
  7. def create_node(self, value=None) -> UUID:
  8. if value is None:
  9. return self.state.create_node()
  10. else:
  11. return self.state.create_nodevalue(value)
  12. def create_edge(self, source: UUID, target: UUID, label=None):
  13. if label is None:
  14. return self.state.create_edge(source, target)
  15. else:
  16. return self.state.create_dict(source, label, target)
  17. def read_value(self, node: UUID) -> Any:
  18. return self.state.read_value(node)
  19. def read_edge_source(self, edge: UUID) -> UUID:
  20. result = self.state.read_edge(edge)
  21. return result[0] if result is not None else result
  22. def read_edge_target(self, edge: UUID) -> UUID:
  23. result = self.state.read_edge(edge)
  24. return result[1] if result is not None else result
  25. def read_incoming_edges(self, target: UUID, label=None) -> List[UUID]:
  26. def read_label(_edge: UUID):
  27. try:
  28. label_edge, = self.state.read_outgoing(_edge)
  29. _, tgt = self.state.read_edge(label_edge)
  30. _label = self.state.read_value(tgt)
  31. return _label
  32. except (TypeError, ValueError):
  33. return None
  34. edges = self.state.read_incoming(target)
  35. if edges is None:
  36. return []
  37. if label is not None:
  38. edges = [e for e in edges if read_label(e) == label]
  39. return edges
  40. def read_outgoing_edges(self, source: UUID, label=None) -> List[UUID]:
  41. def read_label(_edge: UUID):
  42. try:
  43. label_edge, = self.state.read_outgoing(_edge)
  44. _, tgt = self.state.read_edge(label_edge)
  45. _label = self.state.read_value(tgt)
  46. return _label
  47. except (TypeError, ValueError):
  48. return None
  49. edges = self.state.read_outgoing(source)
  50. if edges is None:
  51. return []
  52. if label is not None:
  53. edges = [e for e in edges if read_label(e) == label]
  54. return edges
  55. def read_incoming_nodes(self, target: UUID, label=None) -> List[UUID]:
  56. edges = self.read_incoming_edges(target, label)
  57. if edges is None or len(edges) == 0:
  58. return []
  59. else:
  60. return [self.read_edge_source(e) for e in edges]
  61. def read_outgoing_nodes(self, source: UUID, label=None) -> List[UUID]:
  62. edges = self.read_outgoing_edges(source, label)
  63. if edges is None or len(edges) == 0:
  64. return []
  65. else:
  66. return [self.read_edge_target(e) for e in edges]
  67. def read_keys(self, element: UUID) -> List[str]:
  68. key_nodes = self.state.read_dict_keys(element)
  69. unique_keys = {self.state.read_value(node) for node in key_nodes}
  70. return list(unique_keys)
  71. def delete_element(self, element: UUID):
  72. src, tgt = self.state.read_edge(element)
  73. if src is None and tgt is None:
  74. # node
  75. self.state.delete_node(element)
  76. else:
  77. # edge
  78. self.state.delete_edge(element)