123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- from typing import Any, List, Tuple, Optional, Generator
- from rdflib import Graph, Namespace, URIRef, Literal
- from rdflib.plugins.sparql import prepareQuery
- import json
- from .base import State
- # Define graph datasctructures used by implementation
- # Use NewType to create distinct type or just create a type alias
- Element = URIRef
- Node = URIRef
- Edge = URIRef
- class RDFState(State):
- def __init__(self, namespace_uri="http://modelverse.mv/#"):
- self.graph = Graph()
- self.namespace_uri = namespace_uri
- self.mv = Namespace(namespace_uri)
- self.graph.bind("MV", self.mv)
- self.prepared_queries = {
- "read_value": """
- SELECT ?value
- WHERE {
- ?var1 MV:hasValue ?value .
- }
- """,
- "read_outgoing": """
- SELECT ?link
- WHERE {
- ?link MV:hasSource ?var1 .
- }
- """,
- "read_incoming": """
- SELECT ?link
- WHERE {
- ?link MV:hasTarget ?var1 .
- }
- """,
- "read_edge": """
- SELECT ?source ?target
- WHERE {
- ?var1 MV:hasSource ?source ;
- MV:hasTarget ?target .
- }
- """,
- "read_dict_keys": """
- SELECT ?key
- WHERE {
- ?main_edge MV:hasSource ?var1 .
- ?attr_edge MV:hasSource ?main_edge ;
- MV:hasTarget ?key .
- }
- """,
- "read_dict_node": """
- SELECT ?value_node
- WHERE {
- ?main_edge MV:hasSource ?var1 ;
- MV:hasTarget ?value_node .
- ?attr_edge MV:hasSource ?main_edge ;
- MV:hasTarget ?var2 .
- }
- """,
- "read_dict_node_edge": """
- SELECT ?main_edge
- WHERE {
- ?main_edge MV:hasSource ?var1 .
- ?attr_edge MV:hasSource ?main_edge ;
- MV:hasTarget ?var2 .
- }
- """,
- "delete_node": """
- SELECT ?edge
- WHERE {
- { ?edge MV:hasTarget ?var1 . }
- UNION
- { ?edge MV:hasSource ?var1 . }
- }
- """,
- "delete_edge": """
- SELECT ?edge
- WHERE {
- { ?edge MV:hasTarget ?var1 . }
- UNION
- { ?edge MV:hasSource ?var1 . }
- }
- """,
- }
- self.garbage = set()
- for k, v in list(self.prepared_queries.items()):
- self.prepared_queries[k] = prepareQuery(self.prepared_queries[k], initNs={"MV": self.mv})
- self.root = self.create_node()
- def create_node(self) -> Node:
- return URIRef(self.namespace_uri + str(self.new_id()))
- def create_edge(self, source: Element, target: Element) -> Optional[Edge]:
- if not isinstance(source, URIRef):
- return None
- elif not isinstance(target, URIRef):
- return None
- edge = URIRef(self.namespace_uri + str(self.new_id()))
- self.graph.add((edge, self.mv.hasSource, source))
- self.graph.add((edge, self.mv.hasTarget, target))
- return edge
- def create_nodevalue(self, value: Any) -> Optional[Node]:
- if not self.is_valid_datavalue(value):
- return None
- node = URIRef(self.namespace_uri + str(self.new_id()))
- if isinstance(value, tuple):
- value = {"Type": value[0]}
- self.graph.add((node, self.mv.hasValue, Literal(json.dumps(value))))
- return node
- def create_dict(self, source: Element, value: Any, target: Element) -> Optional[Tuple[Edge, Edge, Node]]:
- if not isinstance(source, URIRef):
- return
- if not isinstance(target, URIRef):
- return
- if not self.is_valid_datavalue(value):
- return
- n = self.create_nodevalue(value)
- e = self.create_edge(source, target)
- self.create_edge(e, n)
- def read_root(self) -> Node:
- return self.root
- def read_value(self, node: Node) -> Optional[Any]:
- if not isinstance(node, URIRef) or not (node, None, None) in self.graph:
- return None
- result = self.graph.query(self.prepared_queries["read_value"], initBindings={"var1": node})
- if len(result) == 0:
- return None
- result = json.loads(list(result)[0][0])
- return result if not isinstance(result, dict) else (result["Type"],)
- def read_outgoing(self, elem: Element) -> Optional[List[Edge]]:
- if not isinstance(elem, URIRef) or elem in self.garbage:
- return None
- result = self.graph.query(self.prepared_queries["read_outgoing"], initBindings={"var1": elem})
- return [i[0] for i in result]
- def read_incoming(self, elem: Element) -> Optional[List[Edge]]:
- if not isinstance(elem, URIRef) or elem in self.garbage:
- return None
- result = self.graph.query(self.prepared_queries["read_incoming"], initBindings={"var1": elem})
- return [i[0] for i in result]
- def read_edge(self, edge: Edge) -> Tuple[Optional[Node], Optional[Node]]:
- if not isinstance(edge, URIRef) or not (edge, None, None) in self.graph:
- return None, None
- result = self.graph.query(self.prepared_queries["read_edge"], initBindings={"var1": edge})
- if len(result) == 0:
- return None, None
- else:
- return list(result)[0][0], list(result)[0][1]
- def read_dict(self, elem: Element, value: Any) -> Optional[Element]:
- if not isinstance(elem, URIRef):
- return None
- q = f"""
- SELECT ?value_node
- WHERE {{
- ?main_edge MV:hasSource <{elem}> ;
- MV:hasTarget ?value_node .
- ?attr_edge MV:hasSource ?main_edge ;
- MV:hasTarget ?attr_node .
- ?attr_node MV:hasValue '{json.dumps(value)}' .
- }}
- """
- result = self.graph.query(q)
- if len(result) == 0:
- return None
- return list(result)[0][0]
- def read_dict_keys(self, elem: Element) -> Optional[List[Any]]:
- if not isinstance(elem, URIRef):
- return None
- result = self.graph.query(self.prepared_queries["read_dict_keys"], initBindings={"var1": elem})
- return [i[0] for i in result]
- def read_dict_edge(self, elem: Element, value: Any) -> Optional[Edge]:
- if not isinstance(elem, URIRef):
- return None
- result = self.graph.query(
- f"""
- SELECT ?main_edge
- WHERE {{
- ?main_edge MV:hasSource <{elem}> ;
- MV:hasTarget ?value_node .
- ?attr_edge MV:hasSource ?main_edge ;
- MV:hasTarget ?attr_node .
- ?attr_node MV:hasValue '{json.dumps(value)}' .
- }}
- """)
- if len(result) == 0:
- return None
- return list(result)[0][0]
- def read_dict_node(self, elem: Element, value_node: Node) -> Optional[Element]:
- if not isinstance(elem, URIRef):
- return None
- if not isinstance(value_node, URIRef):
- return None
- result = self.graph.query(
- self.prepared_queries["read_dict_node"], initBindings={"var1": elem, "var2": value_node}
- )
- if len(result) == 0:
- return None
- return list(result)[0][0]
- def read_dict_node_edge(self, elem: Element, value_node: Node) -> Optional[Edge]:
- if not isinstance(elem, URIRef):
- return None
- if not isinstance(value_node, URIRef):
- return None
- result = self.graph.query(
- self.prepared_queries["read_dict_node_edge"], initBindings={"var1": elem, "var2": value_node}
- )
- if len(result) == 0:
- return None
- return list(result)[0][0]
- def read_reverse_dict(self, elem: Element, value: Any) -> Optional[List[Element]]:
- if not isinstance(elem, URIRef):
- return None
- result = self.graph.query(
- f"""
- SELECT ?source_node
- WHERE {{
- ?main_edge MV:hasTarget <{elem}> ;
- MV:hasSource ?source_node .
- ?attr_edge MV:hasSource ?main_edge ;
- MV:hasTarget ?value_node .
- ?value_node MV:hasValue '{json.dumps(value)}' .
- }}
- """)
- return [i[0] for i in result]
- def delete_node(self, node: Node) -> None:
- if node == self.root:
- return
- if not isinstance(node, URIRef):
- return
- # Check whether node isn't an edge
- if (node, self.mv.hasSource, None) in self.graph or (node, self.mv.hasTarget, None) in self.graph:
- return
- # Remove its value if it exists
- self.graph.remove((node, None, None))
- # Get all edges connecting this
- result = self.graph.query(self.prepared_queries["delete_node"], initBindings={"var1": node})
- # ... and remove them
- for e in result:
- self.delete_edge(e[0])
- self.garbage.add(node)
- def delete_edge(self, edge: Edge) -> None:
- if not isinstance(edge, URIRef):
- return
- # Check whether edge is actually an edge
- if not ((edge, self.mv.hasSource, None) in self.graph and (edge, self.mv.hasTarget, None) in self.graph):
- return
- # Remove its links
- self.graph.remove((edge, None, None))
- # Get all edges connecting this
- result = self.graph.query(self.prepared_queries["delete_edge"], initBindings={"var1": edge})
- # ... and remove them
- for e in result:
- self.delete_edge(e[0])
- self.garbage.add(edge)
|