Browse Source

Added back RDF and Neo4j implementations

Andrei Bondarenko 2 years ago
parent
commit
633aa84f39
4 changed files with 586 additions and 7 deletions
  1. 3 1
      requirements.txt
  2. 301 0
      state/neo4jstate.py
  3. 276 0
      state/rdfstate.py
  4. 6 6
      state/test/fixtures/state.py

+ 3 - 1
requirements.txt

@@ -1 +1,3 @@
-pytest==6.2.4
+pytest==6.2.4
+neo4j==4.3.4
+rdflib==6.0.0

+ 301 - 0
state/neo4jstate.py

@@ -0,0 +1,301 @@
+from typing import Any, Optional, List, Tuple, Callable, Generator
+from neo4j import GraphDatabase
+from ast import literal_eval
+
+from .base import State, Edge, Node, Element, UUID
+
+
+
+class Neo4jState(State):
+    def __init__(self, uri="bolt://localhost:7687", user="neo4j", password="tests"):
+        self.driver = GraphDatabase.driver(uri, auth=(user, password))
+        self.root = self.create_node()
+
+    def close(self, *, clear=False):
+        if clear:
+            self._run_and_return(self._clear)
+        self.driver.close()
+
+    def _run_and_return(self, query: Callable, **kwargs):
+        with self.driver.session() as session:
+            result = session.write_transaction(query, **kwargs)
+            return result
+
+    @staticmethod
+    def _clear(tx):
+        tx.run("MATCH (n) "
+               "DETACH DELETE n")
+
+    @staticmethod
+    def _existence_check(tx, eid, label="Element"):
+        result = tx.run(f"MATCH (elem:{label}) "
+                        "WHERE elem.id = $eid "
+                        "RETURN elem.id",
+                        eid=eid)
+        try:
+            return result.single()[0]
+        except TypeError:
+            # No node found for nid
+            # ergo, no edge created
+            return None
+
+    def create_node(self) -> Node:
+        def query(tx, nid):
+            result = tx.run("CREATE (n:Element:Node) "
+                            "SET n.id = $nid "
+                            "RETURN n.id",
+                            nid=nid)
+            return result.single()[0]
+
+        node = self._run_and_return(query, nid=str(self.new_id()))
+        return UUID(node) if node is not None else None
+
+    def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
+        def query(tx, eid, sid, tid):
+            result = tx.run("MATCH (source), (target) "
+                            "WHERE source.id = $sid AND target.id = $tid "
+                            "CREATE (source) -[:Source]-> (e:Element:Edge) -[:Target]-> (target) "
+                            "SET e.id = $eid "
+                            "RETURN e.id",
+                            eid=eid, sid=sid, tid=tid)
+            try:
+                return result.single()[0]
+            except TypeError:
+                # No node found for sid and/or tid
+                # ergo, no edge created
+                return None
+
+        edge = self._run_and_return(query, eid=str(self.new_id()), sid=str(source), tid=str(target))
+        return UUID(edge) if edge is not None else None
+
+    def create_nodevalue(self, value: Any) -> Optional[Node]:
+        def query(tx, nid, val):
+            result = tx.run("CREATE (n:Element:Node) "
+                            "SET n.id = $nid, n.value = $val "
+                            "RETURN n.id",
+                            nid=nid, val=val)
+            return result.single()[0]
+
+        if not self.is_valid_datavalue(value):
+            return None
+
+        node = self._run_and_return(query, nid=str(self.new_id()), val=repr(value))
+        return UUID(node) if node is not None else None
+
+    def create_dict(self, source: Element, value: Any, target: Element) -> Optional[Tuple[Edge, Edge, Node]]:
+        if not self.is_valid_datavalue(value):
+            return None
+
+        edge_node = self.create_edge(source, target)
+        val_node = self.create_nodevalue(value)
+        if edge_node is not None and val_node is not None:
+            self.create_edge(edge_node, val_node)
+
+    def read_root(self) -> Node:
+        return self.root
+
+    def read_value(self, node: Node) -> Optional[Any]:
+        def query(tx, nid):
+            result = tx.run("MATCH (n:Node) "
+                            "WHERE n.id = $nid "
+                            "RETURN n.value",
+                            nid=nid)
+            try:
+                return result.single()[0]
+            except TypeError:
+                # No node found for nid
+                return None
+
+        value = self._run_and_return(query, nid=str(node))
+        return literal_eval(value) if value is not None else None
+
+    def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
+        def query(tx, eid):
+            result = tx.run("MATCH (elem:Element) -[:Source]-> (e:Edge) "
+                            "WHERE elem.id = $eid "
+                            "RETURN e.id",
+                            eid=eid)
+            return result.value()
+
+        source_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
+        if source_exists:
+            result = self._run_and_return(query, eid=str(elem))
+            return [UUID(x) for x in result] if result is not None else None
+
+    def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
+        def query(tx, eid):
+            result = tx.run("MATCH (elem:Element) <-[:Target]- (e:Edge) "
+                            "WHERE elem.id = $eid "
+                            "RETURN e.id",
+                            eid=eid)
+            return result.value()
+
+        target_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
+        if target_exists:
+            result = self._run_and_return(query, eid=str(elem))
+            return [UUID(x) for x in result] if result is not None else None
+
+    def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
+        def query(tx, eid):
+            result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt)"
+                            "WHERE e.id = $eid "
+                            "RETURN src.id, tgt.id",
+                            eid=eid)
+            return result.single()
+
+        edge_exists = self._run_and_return(self._existence_check, eid=str(edge), label="Edge") is not None
+        if edge_exists:
+            try:
+                src, tgt = self._run_and_return(query, eid=str(edge))
+                return UUID(src), UUID(tgt)
+            except TypeError:
+                return None, None
+        else:
+            return None, None
+
+    def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
+        def query(tx, eid, label_value):
+            result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), "
+                            "(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
+                            "WHERE src.id = $eid "
+                            "AND label.value = $val "
+                            "RETURN tgt.id",
+                            eid=eid, val=label_value)
+            try:
+                return result.single()[0]
+            except TypeError:
+                # No edge found with given label
+                return None
+
+        elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
+        if elem_exists:
+            if isinstance(value, UUID):
+                return None
+            result = self._run_and_return(query, eid=str(elem), label_value=repr(value))
+            return UUID(result) if result is not None else None
+
+    def read_dict_keys(self, elem: Element) -> Optional[List[Any]]:
+        def query(tx, eid):
+            result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), "
+                            "(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
+                            "WHERE src.id = $eid "
+                            "RETURN label.id",
+                            eid=eid)
+            try:
+                return result.value()
+            except TypeError:
+                # No edge found with given label
+                return None
+
+        elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
+        if elem_exists:
+            result = self._run_and_return(query, eid=str(elem))
+            return [UUID(x) for x in result if x is not None]
+
+    def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
+        def query(tx, eid, label_value):
+            result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), "
+                            "(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
+                            "WHERE src.id = $eid "
+                            "AND label.value = $val "
+                            "RETURN e.id",
+                            eid=eid, val=label_value)
+            try:
+                return result.single()[0]
+            except TypeError:
+                # No edge found with given label
+                return None
+
+        elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
+        if elem_exists:
+            result = self._run_and_return(query, eid=str(elem), label_value=repr(value))
+            return UUID(result) if result is not None else None
+
+    def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
+        def query(tx, eid, label_id):
+            result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), "
+                            "(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
+                            "WHERE src.id = $eid "
+                            "AND label.id = $lid "
+                            "RETURN tgt.id",
+                            eid=eid, lid=label_id)
+            try:
+                return result.single()[0]
+            except TypeError:
+                # No edge found with given label
+                return None
+
+        elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
+        if elem_exists:
+            result = self._run_and_return(query, eid=str(elem), label_id=str(value_node))
+            return UUID(result) if result is not None else None
+
+    def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
+        def query(tx, eid, label_id):
+            result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (), "
+                            "(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
+                            "WHERE src.id = $eid "
+                            "AND label.id = $lid "
+                            "RETURN e.id",
+                            eid=eid, lid=label_id)
+            try:
+                return result.single()[0]
+            except TypeError:
+                # No edge found with given label
+                return None
+
+        elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
+        if elem_exists:
+            result = self._run_and_return(query, eid=str(elem), label_id=str(value_node))
+            return UUID(result) if result is not None else None
+
+    def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
+        def query(tx, eid, label_value):
+            result = tx.run("MATCH (src) -[:Source]-> (e:Edge) -[:Target]-> (tgt), "
+                            "(e) -[:Source]-> (:Edge) -[:Target]-> (label)"
+                            "WHERE tgt.id = $eid "
+                            "AND label.value = $val "
+                            "RETURN src.id",
+                            eid=eid, val=label_value)
+            try:
+                return result.value()
+            except TypeError:
+                # No edge found with given label
+                return None
+
+        elem_exists = self._run_and_return(self._existence_check, eid=str(elem)) is not None
+        if elem_exists:
+            result = self._run_and_return(query, eid=str(elem), label_value=repr(value))
+            return [UUID(x) for x in result if x is not None]
+
+    def delete_node(self, node: Node) -> None:
+        def query(tx, nid):
+            result = tx.run("MATCH (n:Node) "
+                            "WHERE n.id = $nid "
+                            "OPTIONAL MATCH (n) -- (e:Edge) "
+                            "DETACH DELETE n "
+                            "RETURN e.id",
+                            nid=nid)
+            return result.value()
+
+        to_be_deleted = self._run_and_return(query, nid=str(node))
+        to_be_deleted = [UUID(x) for x in to_be_deleted if x is not None]
+        for edge in to_be_deleted:
+            self.delete_edge(edge)
+
+    def delete_edge(self, edge: Edge) -> None:
+        def query(tx, eid):
+            result = tx.run("MATCH (e1:Edge) "
+                            "WHERE e1.id = $eid "
+                            "OPTIONAL MATCH (e1) -- (e2:Edge) "
+                            "WHERE (e1) -[:Source]-> (e2) "
+                            "OR (e1) <-[:Target]- (e2) "
+                            "DETACH DELETE e1 "
+                            "RETURN e2.id",
+                            eid=eid)
+            return result.value()
+
+        to_be_deleted = self._run_and_return(query, eid=str(edge))
+        to_be_deleted = [UUID(x) for x in to_be_deleted if x is not None]
+        for edge in to_be_deleted:
+            self.delete_edge(edge)

+ 276 - 0
state/rdfstate.py

@@ -0,0 +1,276 @@
+from typing import Any, List, Tuple, Optional, Generator
+from rdflib import Graph, Namespace, URIRef, Literal
+from rdflib.plugins.sparql import prepareQuery
+import json
+
+from .base import State
+
+# Define graph datasctructures used by implementation
+# Use NewType to create distinct type or just create a type alias
+Element = URIRef
+Node = URIRef
+Edge = URIRef
+
+
+class RDFState(State):
+    def __init__(self, namespace_uri="http://modelverse.mv/#"):
+        self.graph = Graph()
+        self.namespace_uri = namespace_uri
+        self.mv = Namespace(namespace_uri)
+        self.graph.bind("MV", self.mv)
+        self.prepared_queries = {
+            "read_value": """
+                            SELECT ?value
+                            WHERE {
+                                ?var1 MV:hasValue ?value .
+                            }
+                          """,
+            "read_outgoing": """
+                            SELECT ?link
+                            WHERE {
+                                ?link MV:hasSource ?var1 .
+                            }
+                            """,
+            "read_incoming": """
+                            SELECT ?link
+                            WHERE {
+                                ?link MV:hasTarget ?var1 .
+                            }
+                            """,
+            "read_edge": """
+                            SELECT ?source ?target
+                            WHERE {
+                                ?var1 MV:hasSource ?source ;
+                                      MV:hasTarget ?target .
+                            }
+                            """,
+            "read_dict_keys": """
+                            SELECT ?key
+                            WHERE {
+                                ?main_edge MV:hasSource ?var1 .
+                                ?attr_edge MV:hasSource ?main_edge ;
+                                           MV:hasTarget ?key .
+                            }
+                            """,
+            "read_dict_node": """
+                            SELECT ?value_node
+                            WHERE {
+                                ?main_edge MV:hasSource ?var1 ;
+                                           MV:hasTarget ?value_node .
+                                ?attr_edge MV:hasSource ?main_edge ;
+                                           MV:hasTarget ?var2 .
+                            }
+                            """,
+            "read_dict_node_edge": """
+                            SELECT ?main_edge
+                            WHERE {
+                                ?main_edge MV:hasSource ?var1 .
+                                ?attr_edge MV:hasSource ?main_edge ;
+                                           MV:hasTarget ?var2 .
+                            }
+                            """,
+            "delete_node": """
+                            SELECT ?edge
+                            WHERE {
+                                { ?edge MV:hasTarget ?var1 . }
+                                UNION
+                                { ?edge MV:hasSource ?var1 . }
+                            }
+                            """,
+            "delete_edge": """
+                            SELECT ?edge
+                            WHERE {
+                                { ?edge MV:hasTarget ?var1 . }
+                                UNION
+                                { ?edge MV:hasSource ?var1 . }
+                            }
+                            """,
+        }
+        self.garbage = set()
+
+        for k, v in list(self.prepared_queries.items()):
+            self.prepared_queries[k] = prepareQuery(self.prepared_queries[k], initNs={"MV": self.mv})
+
+        self.root = self.create_node()
+
+    def create_node(self) -> Node:
+        return URIRef(self.namespace_uri + str(self.new_id()))
+
+    def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
+        if not isinstance(source, URIRef):
+            return None
+        elif not isinstance(target, URIRef):
+            return None
+        edge = URIRef(self.namespace_uri + str(self.new_id()))
+        self.graph.add((edge, self.mv.hasSource, source))
+        self.graph.add((edge, self.mv.hasTarget, target))
+        return edge
+
+    def create_nodevalue(self, value: Any) -> Optional[Node]:
+        if not self.is_valid_datavalue(value):
+            return None
+        node = URIRef(self.namespace_uri + str(self.new_id()))
+        if isinstance(value, tuple):
+            value = {"Type": value[0]}
+        self.graph.add((node, self.mv.hasValue, Literal(json.dumps(value))))
+        return node
+
+    def create_dict(self, source: Element, value: Any, target: Element) -> Optional[Tuple[Edge, Edge, Node]]:
+        if not isinstance(source, URIRef):
+            return
+        if not isinstance(target, URIRef):
+            return
+        if not self.is_valid_datavalue(value):
+            return
+
+        n = self.create_nodevalue(value)
+        e = self.create_edge(source, target)
+        self.create_edge(e, n)
+
+    def read_root(self) -> Node:
+        return self.root
+
+    def read_value(self, node: Node) -> Optional[Any]:
+        if not isinstance(node, URIRef) or not (node, None, None) in self.graph:
+            return None
+        result = self.graph.query(self.prepared_queries["read_value"], initBindings={"var1": node})
+        if len(result) == 0:
+            return None
+        result = json.loads(list(result)[0][0])
+        return result if not isinstance(result, dict) else (result["Type"],)
+
+    def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
+        if not isinstance(elem, URIRef) or elem in self.garbage:
+            return None
+        result = self.graph.query(self.prepared_queries["read_outgoing"], initBindings={"var1": elem})
+        return [i[0] for i in result]
+
+    def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
+        if not isinstance(elem, URIRef) or elem in self.garbage:
+            return None
+        result = self.graph.query(self.prepared_queries["read_incoming"], initBindings={"var1": elem})
+        return [i[0] for i in result]
+
+    def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
+        if not isinstance(edge, URIRef) or not (edge, None, None) in self.graph:
+            return None, None
+        result = self.graph.query(self.prepared_queries["read_edge"], initBindings={"var1": edge})
+        if len(result) == 0:
+            return None, None
+        else:
+            return list(result)[0][0], list(result)[0][1]
+
+    def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
+        if not isinstance(elem, URIRef):
+            return None
+        q = f"""
+            SELECT ?value_node
+            WHERE {{
+                ?main_edge MV:hasSource <{elem}> ;
+                           MV:hasTarget ?value_node .
+                ?attr_edge MV:hasSource ?main_edge ;
+                           MV:hasTarget ?attr_node .
+                ?attr_node MV:hasValue '{json.dumps(value)}' .
+            }}
+            """
+        result = self.graph.query(q)
+        if len(result) == 0:
+            return None
+        return list(result)[0][0]
+
+    def read_dict_keys(self, elem: Element) -> Optional[List[Any]]:
+        if not isinstance(elem, URIRef):
+            return None
+        result = self.graph.query(self.prepared_queries["read_dict_keys"], initBindings={"var1": elem})
+        return [i[0] for i in result]
+
+    def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
+        if not isinstance(elem, URIRef):
+            return None
+        result = self.graph.query(
+            f"""
+            SELECT ?main_edge
+            WHERE {{
+                ?main_edge MV:hasSource <{elem}> ;
+                           MV:hasTarget ?value_node .
+                ?attr_edge MV:hasSource ?main_edge ;
+                           MV:hasTarget ?attr_node .
+                ?attr_node MV:hasValue '{json.dumps(value)}' .
+            }}
+            """)
+        if len(result) == 0:
+            return None
+        return list(result)[0][0]
+
+    def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
+        if not isinstance(elem, URIRef):
+            return None
+        if not isinstance(value_node, URIRef):
+            return None
+        result = self.graph.query(
+            self.prepared_queries["read_dict_node"], initBindings={"var1": elem, "var2": value_node}
+        )
+        if len(result) == 0:
+            return None
+        return list(result)[0][0]
+
+    def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
+        if not isinstance(elem, URIRef):
+            return None
+        if not isinstance(value_node, URIRef):
+            return None
+        result = self.graph.query(
+            self.prepared_queries["read_dict_node_edge"], initBindings={"var1": elem, "var2": value_node}
+        )
+        if len(result) == 0:
+            return None
+        return list(result)[0][0]
+
+    def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
+        if not isinstance(elem, URIRef):
+            return None
+        result = self.graph.query(
+            f"""
+            SELECT ?source_node
+            WHERE {{
+                ?main_edge MV:hasTarget <{elem}> ;
+                           MV:hasSource ?source_node .
+                ?attr_edge MV:hasSource ?main_edge ;
+                           MV:hasTarget ?value_node .
+                ?value_node MV:hasValue '{json.dumps(value)}' .
+            }}
+            """)
+
+        return [i[0] for i in result]
+
+    def delete_node(self, node: Node) -> None:
+        if node == self.root:
+            return
+        if not isinstance(node, URIRef):
+            return
+        # Check whether node isn't an edge
+        if (node, self.mv.hasSource, None) in self.graph or (node, self.mv.hasTarget, None) in self.graph:
+            return
+        # Remove its value if it exists
+        self.graph.remove((node, None, None))
+        # Get all edges connecting this
+        result = self.graph.query(self.prepared_queries["delete_node"], initBindings={"var1": node})
+        # ... and remove them
+        for e in result:
+            self.delete_edge(e[0])
+        self.garbage.add(node)
+
+    def delete_edge(self, edge: Edge) -> None:
+        if not isinstance(edge, URIRef):
+            return
+        # Check whether edge is actually an edge
+        if not ((edge, self.mv.hasSource, None) in self.graph and (edge, self.mv.hasTarget, None) in self.graph):
+            return
+        # Remove its links
+        self.graph.remove((edge, None, None))
+        # Get all edges connecting this
+        result = self.graph.query(self.prepared_queries["delete_edge"], initBindings={"var1": edge})
+        # ... and remove them
+        for e in result:
+            self.delete_edge(e[0])
+        self.garbage.add(edge)

+ 6 - 6
state/test/fixtures/state.py

@@ -1,13 +1,13 @@
 import pytest
 from state.pystate import PyState
-# from state.rdfstate import RDFState
-# from state.neo4jstate import Neo4jState
+from state.rdfstate import RDFState
+from state.neo4jstate import Neo4jState
 
 
 @pytest.fixture(params=[
     (PyState,),
-    # (RDFState, "http://example.org/#"),
-    # (Neo4jState,)
+    (RDFState, "http://example.org/#"),
+    (Neo4jState,)
 ])
 def state(request):
     if len(request.param) > 1:
@@ -15,5 +15,5 @@ def state(request):
     else:
         state = request.param[0]()
     yield state
-    # if isinstance(state, Neo4jState):
-    #     state.close(clear=True)
+    if isinstance(state, Neo4jState):
+        state.close(clear=True)