Selaa lähdekoodia

Improve sub-query handling in new cell service call

Arkadiusz Ryś 2 vuotta sitten
vanhempi
commit
275af584e2
3 muutettua tiedostoa jossa 220 lisäystä ja 260 poistoa
  1. 25 259
      spendpoint/endpoint.py
  2. 195 0
      spendpoint/router.py
  3. 0 1
      spendpoint/service.py

+ 25 - 259
spendpoint/endpoint.py

@@ -1,276 +1,42 @@
 # Copied and modified from https://pypi.org/project/rdflib-endpoint/
 # Copied and modified from https://pypi.org/project/rdflib-endpoint/
+# https://fastapi.tiangolo.com/
 
 
 import logging
 import logging
-import re
 import arklog
 import arklog
-import pandas as pd
-import rdflib
-from typing import Any, Dict, List, Optional, Union
-from urllib import parse
-from fastapi import FastAPI, Query, Request, Response
-from fastapi.responses import JSONResponse
-from rdflib import ConjunctiveGraph, Dataset, Graph, Literal, URIRef
-from rdflib.plugins.sparql import prepareQuery
-from rdflib.plugins.sparql.evaluate import evalPart
-from rdflib.plugins.sparql.evalutils import _eval
-from rdflib.plugins.sparql.parserutils import CompValue
-from rdflib.plugins.sparql.sparql import QueryContext, SPARQLError
-
-from spendpoint import service
+import time
+from typing import Any
+from fastapi import FastAPI, Request, Response
+from fastapi.middleware.cors import CORSMiddleware
+from spendpoint.router import SparqlRouter
 
 
 arklog.set_config_logging()
 arklog.set_config_logging()
 
 
-
 class SparqlEndpoint(FastAPI):
 class SparqlEndpoint(FastAPI):
     """SPARQL endpoint for services and storage of heterogeneous data."""
     """SPARQL endpoint for services and storage of heterogeneous data."""
 
 
-    @staticmethod
-    def is_json_mime_type(mime: str) -> bool:
-        """"""
-        return mime.split(",")[0] in ("application/sparql-results+json","application/json","text/javascript","application/javascript")
-
-    @staticmethod
-    def is_csv_mime_type(mime: str) -> bool:
-        """"""
-        return mime.split(",")[0] in ("text/csv", "application/sparql-results+csv")
-
-    @staticmethod
-    def is_xml_mime_type(mime: str) -> bool:
-        """"""
-        return mime.split(",")[0] in ("application/xml", "application/sparql-results+xml")
-
-    @staticmethod
-    def is_turtle_mime_type(mime: str) -> bool:
-        """"""
-        return mime.split(",")[0] in ("text/turtle",)
-
-    async def requested_result_type(self, request: Request, operation: str) -> str:
-        output_mime_type = request.headers["accept"]
-        # TODO Ugly hack, fix later (Fuseki sends options)
-        output_mime_type = output_mime_type.split(",")[0]
-        # TODO Another ugly hack because if we get */* we don't really know what to return
-        if output_mime_type == "*/*":
-            output_mime_type = "application/json"
-        if isinstance(output_mime_type, list):
-            return output_mime_type[0]
-        # TODO Use match or dict for this
-        if not output_mime_type:
-            logging.warning("No mime type provided. Setting mimetype to 'application/xml'.")
-            return "application/xml"
-        if operation == "Construct Query" and (self.is_json_mime_type(output_mime_type) or self.is_csv_mime_type(output_mime_type)):
-            return "text/turtle"
-        if operation == "Construct Query" and output_mime_type == "application/xml":
-            return "application/rdf+xml"
-        return output_mime_type
-
-    def __init__(self, *args: Any, title: str, description: str, version: str, configuration, graph: Union[Graph, ConjunctiveGraph, Dataset] = ConjunctiveGraph(), **kwargs: Any):
+    def __init__(self, *args: Any, title: str, description: str, version: str, configuration, **kwargs: Any):
         """"""
         """"""
-        self.graph = graph
         self.title = title
         self.title = title
         self.description = description
         self.description = description
         self.version = version
         self.version = version
         self.configuration = configuration
         self.configuration = configuration
         super().__init__(*args, title=title, description=description, version=version, **kwargs)
         super().__init__(*args, title=title, description=description, version=version, **kwargs)
         logging.debug(self.description)
         logging.debug(self.description)
-        rdflib.plugins.sparql.CUSTOM_EVALS["evalCustomFunctions"] = self.eval_custom_functions
-        api_responses: Optional[Dict[Union[int, str], Dict[str, Any]]] = {
-            200: {
-                "description": "SPARQL query results",
-                "content": {
-                    "application/sparql-results+json": {
-                        "results": {"bindings": []},
-                        "head": {"vars": []},
-                    },
-                    "application/json": {
-                        "results": {"bindings": []},
-                        "head": {"vars": []},
-                    },
-                    "text/csv": {"example": "s,p,o"},
-                    "application/sparql-results+csv": {"example": "s,p,o"},
-                    "text/turtle": {"example": "service description"},
-                    "application/sparql-results+xml": {"example": "<root></root>"},
-                    "application/xml": {"example": "<root></root>"},
-                },
-            },
-            400: {
-                "description": "Bad Request",
-            },
-            403: {
-                "description": "Forbidden",
-            },
-            422: {
-                "description": "Unprocessable Entity",
-            },
-        }
-
-        @self.get("/", name="SPARQL endpoint", description="", responses=api_responses)
-        async def sparql_endpoint_get(request: Request, query: Optional[str] = Query(None)) -> Response:
-            logging.debug("Received GET request.")
-            if not query:
-                logging.warning("No query provided in GET request!")
-                return JSONResponse({"error": "No query provided."})
-
-            graph_ns = {}
-            for prefix, ns_uri in self.graph.namespaces():
-                graph_ns[prefix] = ns_uri
-
-            try:
-                parsed_query = prepareQuery(query, initNs=graph_ns)
-                query_operation = re.sub(r"(\w)([A-Z])", r"\1 \2", parsed_query.algebra.name)
-            except Exception as e:
-                logging.error("Error parsing the SPARQL query: " + str(e))
-                return JSONResponse(
-                    status_code=400,
-                    content={"message": "Error parsing the SPARQL query"},
-                )
-
-            try:
-                query_results = self.graph.query(query, initNs=graph_ns)
-            except Exception as e:
-                logging.error("Error executing the SPARQL query on the RDFLib Graph: " + str(e))
-                # TODO Send better error which can be parsed as a SPARQL response or check it client side
-                return JSONResponse(
-                    status_code=400,
-                    content={"message": "Error executing the SPARQL query on the RDFLib Graph"},
-                )
-            output_mime_type = await self.requested_result_type(request, query_operation)
-            logging.debug(f"Returning {output_mime_type}.")
-            try:
-                if self.is_csv_mime_type(output_mime_type):
-                    return Response(query_results.serialize(format="csv"), media_type=output_mime_type)
-                elif self.is_json_mime_type(output_mime_type):
-                    return Response(query_results.serialize(format="json"), media_type=output_mime_type)
-                elif self.is_xml_mime_type(output_mime_type):
-                    return Response(query_results.serialize(format="xml"), media_type=output_mime_type)
-                elif self.is_turtle_mime_type(output_mime_type):
-                    return Response(query_results.serialize(format="turtle"), media_type=output_mime_type)
-                return Response(query_results.serialize(format="xml"), media_type="application/sparql-results+xml")
-            except Exception as e:
-                logging.exception(e)
-                return JSONResponse(status_code=400, content={"message": "Error executing the SPARQL query on the RDFLib Graph"})
-
-        @self.post("/", name="SPARQL endpoint", description="", responses=api_responses)
-        async def sparql_endpoint_post(request: Request, query: Optional[str] = Query(None)) -> Response:
-            logging.debug("Received POST request.")
-            if not query:
-                # Handle federated query services which provide the query in the body
-                query_body = await request.body()
-                body = query_body.decode("utf-8")
-                parsed_query = parse.parse_qsl(body)
-                for params in parsed_query:
-                    if params[0] == "query":
-                        query = parse.unquote(params[1])
-            return await sparql_endpoint_get(request, query)
-
-
-        # TODO Could also be query parameters
-        # TODO BIG UGLY HACK UPON HACK UPON HACK
-        """
-        PREFIX dtf: <https://ontology.rys.app/dt/function/>
-        SELECT * WHERE {
-          bind(str('http://localhost:8000') as ?base)
-          bind(str('cell') as ?operation)
-          bind(str('example.csv') as ?file)
-          bind(str(0) as ?row)
-          bind(str(0) as ?column)
-          bind(iri(concat(?base, "/", ?operation, "/", ?file, "/", ?row, "/", ?column, "/")) as ?call)
-          SERVICE ?call {?cell ?cell ?cell}
-        }
-
-        PREFIX dtf: <https://ontology.rys.app/dt/function/>
-        SELECT ?cell WHERE {
-          bind(iri(concat("http://localhost:8000/cell/","example.csv/0/0/")) as ?call)
-          SERVICE ?call {}
-        }
-
-        PREFIX dtf: <https://ontology.rys.app/dt/function/>
-        SELECT ?cell WHERE {
-          BIND(uri(<http://localhost:8000/cell/example.csv/0/0/>) AS ?mep)
-          SERVICE ?mep {
-            SELECT * WHERE {
-            }
-          }
-        }
-
-        PREFIX dtf: <https://ontology.rys.app/dt/function/>
-        SELECT * WHERE {
-          BIND("\"example.csv\"" as ?filepath)
-          BIND(0 as ?row)
-          BIND(0 as ?column)
-          BIND(CONCAT(?filepath,"/",STR(?row),"/",STR(?column),"/") as ?params)
-          BIND(uri(CONCAT("<http://localhost:8000/cell/",?params,">")) as ?call)
-          SERVICE ?call {SELECT * WHERE {}}
-        }
-        """
-        @self.get("/cell/{file_name}/{row}/{column}/", name="SPARQL endpoint", description="", responses=api_responses)
-        async def sparql_cell_endpoint_get(request: Request, file_name, row, column, query: Optional[str] = Query(None)) -> Response:
-            logging.debug("Received cell GET request.")
-
-            query=f"""
-            PREFIX dtf: <https://ontology.rys.app/dt/function/>
-            SELECT * WHERE {{BIND(dtf:cell("data/{file_name}", {row}, {column}) AS ?cell)}}
-            """
-
-            graph_ns = {}
-            for prefix, ns_uri in self.graph.namespaces():
-                graph_ns[prefix] = ns_uri
-
-            try:
-                parsed_query = prepareQuery(query, initNs=graph_ns)
-                query_operation = re.sub(r"(\w)([A-Z])", r"\1 \2", parsed_query.algebra.name)
-            except Exception as e:
-                logging.error("Error parsing the SPARQL query: " + str(e))
-                return JSONResponse(
-                    status_code=400,
-                    content={"message": "Error parsing the SPARQL query"},
-                )
-
-            try:
-                query_results = self.graph.query(query, initNs=graph_ns)
-            except Exception as e:
-                logging.error("Error executing the SPARQL query on the RDFLib Graph: " + str(e))
-                # TODO Send better error which can be parsed as a SPARQL response or check it client side
-                return JSONResponse(
-                    status_code=400,
-                    content={"message": "Error executing the SPARQL query on the RDFLib Graph"},
-                )
-            output_mime_type = await self.requested_result_type(request, query_operation)
-            logging.debug(f"Returning {output_mime_type}.")
-            try:
-                if self.is_csv_mime_type(output_mime_type):
-                    return Response(query_results.serialize(format="csv"), media_type=output_mime_type)
-                elif self.is_json_mime_type(output_mime_type):
-                    return Response(query_results.serialize(format="json"), media_type=output_mime_type)
-                elif self.is_xml_mime_type(output_mime_type):
-                    return Response(query_results.serialize(format="xml"), media_type=output_mime_type)
-                elif self.is_turtle_mime_type(output_mime_type):
-                    return Response(query_results.serialize(format="turtle"), media_type=output_mime_type)
-                return Response(query_results.serialize(format="xml"), media_type="application/sparql-results+xml")
-            except Exception as e:
-                logging.exception(e)
-                return JSONResponse(status_code=400,
-                                    content={"message": "Error executing the SPARQL query on the RDFLib Graph"})
-
-
-    def eval_custom_functions(self, ctx: QueryContext, part: CompValue) -> List[Any]:
-        if part.name != "Extend":
-            raise NotImplementedError()
-
-        query_results = []
-        logging.debug("Custom evaluation.")
-        for eval_part in evalPart(ctx, part.p):
-            # Checks if the function is a URI (custom function)
-            if hasattr(part.expr, "iri"):
-                for conf_service in self.configuration.services:
-                    # Check if URI correspond to a registered custom function
-                    if part.expr.iri == URIRef(conf_service.namespace):
-                        query_results, ctx, part, eval_part = getattr(service, conf_service.call)(query_results, ctx, part, eval_part, conf_service)
-            else:
-                # For built-in SPARQL functions (that are not URIs)
-                evaluation: List[Any] = [_eval(part.expr, eval_part.forget(ctx, _except=part._vars))]
-                if isinstance(evaluation[0], SPARQLError):
-                    raise evaluation[0]
-                # Append results for built-in SPARQL functions
-                for result in evaluation:
-                    query_results.append(eval_part.merge({part.var: Literal(result)}))
-        return query_results
+        sparql_router = SparqlRouter(title=title, description=description, version=version, configuration=configuration)
+        self.include_router(sparql_router)
+        self.add_middleware(
+            CORSMiddleware,
+            allow_origins=["*"],
+            allow_credentials=True,
+            allow_methods=["*"],
+            allow_headers=["*"],
+        )
+
+        @self.middleware("http")
+        async def add_process_time_header(request: Request, call_next: Any) -> Response:
+            start_time = time.time()
+            response: Response = await call_next(request)
+            duration = str(time.time() - start_time)
+            response.headers["X-Process-Time"] = duration
+            logging.debug(f"X-Process-Time = {duration}")
+            return response

+ 195 - 0
spendpoint/router.py

@@ -0,0 +1,195 @@
+import logging
+import rdflib
+import pandas as pd
+from typing import Any, List, Optional
+from urllib import parse
+from rdflib.plugins.sparql import prepareQuery
+from rdflib.plugins.sparql.processor import SPARQLResult
+from spendpoint import service
+from fastapi import APIRouter, Query, Request, Response
+from fastapi.responses import JSONResponse
+from rdflib import ConjunctiveGraph, Literal, URIRef
+from rdflib.plugins.sparql.evaluate import evalPart
+from rdflib.plugins.sparql.evalutils import _eval
+from rdflib.plugins.sparql.parserutils import CompValue
+from rdflib.plugins.sparql.sparql import QueryContext, SPARQLError
+from rdflib.namespace import CSVW, DC, DCAT, DCTERMS, DOAP, FOAF, ODRL2, ORG, OWL, PROF, PROV, RDF, RDFS, SDO, SH, SKOS, SOSA, SSN, TIME, VOID, XMLNS, XSD
+
+
+CONTENT_TYPE_TO_RDFLIB_FORMAT = {
+    # https://www.w3.org/TR/sparql11-results-json/
+    "application/sparql-results+json": "json",
+    "application/json": "json",
+    "text/json": "json",
+    # https://www.w3.org/TR/rdf-sparql-XMLres/
+    "application/sparql-results+xml": "xml",
+    "application/xml": "xml",  # for compatibility
+    "application/rdf+xml": "xml",  # for compatibility
+    "text/xml": "xml",  # not standard
+    # https://www.w3.org/TR/sparql11-results-csv-tsv/
+    "application/sparql-results+csv": "csv",
+    "text/csv": "csv",  # for compatibility
+    # Extras
+    "text/turtle": "ttl",
+}
+DEFAULT_CONTENT_TYPE = "application/json"
+
+
+def parse_accept_header(accept: str) -> List[str]:
+    """
+    Given an accept header string, return a list of media types in order of preference.
+
+    :param accept: Accept header value
+    :return: Ordered list of media type preferences
+    """
+
+    def _parse_preference(qpref: str) -> float:
+        qparts = qpref.split("=")
+        try:
+            return float(qparts[1].strip())
+        except ValueError:
+            pass
+        except IndexError:
+            pass
+        return 1.0
+
+    preferences = []
+    types = accept.split(",")
+    dpref = 2.0
+    for mtype in types:
+        parts = mtype.split(";")
+        parts = [part.strip() for part in parts]
+        pref = dpref
+        try:
+            for part in parts[1:]:
+                if part.startswith("q="):
+                    pref = _parse_preference(part)
+                    break
+        except IndexError:
+            pass
+        # preserve order of appearance in the list
+        dpref = dpref - 0.01
+        preferences.append((parts[0], pref))
+    preferences.sort(key=lambda x: -x[1])
+    return [pref[0] for pref in preferences]
+
+
+class SparqlRouter(APIRouter):
+    """Class to deploy a SPARQL endpoint using a RDFLib Graph."""
+
+    def __init__(self, *args: Any, title: str, description: str, version: str, configuration, **kwargs: Any):
+        self.title = title
+        self.description = description
+        self.version = version
+        self.configuration = configuration
+        super().__init__(*args, **kwargs)
+        rdflib.plugins.sparql.CUSTOM_EVALS["evalCustomFunctions"] = self.eval_custom_functions
+
+        async def encode_graph_query_results(request, query_results):
+            """"""
+            mime_types = parse_accept_header(request.headers.get("accept", DEFAULT_CONTENT_TYPE))
+            output_mime_type = DEFAULT_CONTENT_TYPE
+            for mime_type in mime_types:
+                if mime_type in CONTENT_TYPE_TO_RDFLIB_FORMAT:
+                    output_mime_type = mime_type
+                    break
+            logging.debug(f"Returning {output_mime_type}.")
+            try:
+                rdflib_format = CONTENT_TYPE_TO_RDFLIB_FORMAT[output_mime_type]
+                response = Response(query_results.serialize(format=rdflib_format), media_type=output_mime_type)
+            except Exception as e:
+                logging.error(f"Error serializing the SPARQL query results with RDFLib: {e}")
+                return JSONResponse(status_code=422, content={"message": "Error serializing the SPARQL query results."})
+            else:
+                return response
+
+        @self.get("/")
+        async def sparql_endpoint_get(request: Request, query: Optional[str] = Query(None)) -> Response:
+            """"""
+            # The graph is empty, so you would expect this to never return any pairs.
+            # But we inject pairs in the custom functions!
+            logging.debug("Received GET request.")
+            if not query:
+                logging.warning("No query provided in GET request!")
+                return JSONResponse({"message": "No query provided."})
+
+            graph = ConjunctiveGraph()
+            try:
+                query_results = graph.query(query)
+            except Exception as e:
+                logging.error("Error executing the SPARQL query on the RDFLib Graph: " + str(e))
+                return JSONResponse(status_code=400, content={"message": "Error executing the SPARQL query on the RDFLib Graph."})
+
+            return await encode_graph_query_results(request, query_results)
+
+        @self.post("/")
+        async def sparql_endpoint_post(request: Request, query: Optional[str] = Query(None)) -> Response:
+            """"""
+            logging.debug("Received POST request.")
+            if not query:
+                query_body = await request.body()
+                body = query_body.decode("utf-8")
+                parsed_query = parse.parse_qsl(body)
+                for params in parsed_query:
+                    if params[0] == "query":
+                        query = parse.unquote(params[1])
+            return await sparql_endpoint_get(request, query)
+
+
+        @self.get("/cell/{file_name}/{row}/{column}/")
+        async def sparql_cell_endpoint_get(request: Request, file_name, row, column, query: Optional[str] = Query(None)) -> Response:
+            """
+            Create an ephemeral graph store based on the call parameters and perform the requested query.
+            SELECT ?s ?p ?o WHERE {
+              bind(str('http://localhost:8000') as ?base)
+              bind(str('cell') as ?operation)
+              bind(str('example.csv') as ?file)
+              bind(str(2) as ?row)
+              bind(str(2) as ?column)
+              bind(iri(concat(?base, "/", ?operation, "/", ?file, "/", ?row, "/", ?column, "/")) as ?call)
+              SERVICE ?call {?s ?p ?o}
+            }
+            """
+            logging.debug(f"Received cell GET request [{file_name}->{row}:{column}].")
+            graph = ConjunctiveGraph()
+            graph_ns = dict(graph.namespaces())
+            # graph_ns["tabular"] = "http://ua.be/sdo2l/vocabulary/formalisms/tabular#"
+            df = pd.read_csv(f"data/{file_name}", index_col=None, header=None)
+            cell_value = df.iat[int(row), int(column)]
+            cell = URIRef(f"http://ua.be/sdo2l/vocabulary/formalisms/tabular#Cell_{row}_{column}")
+            # Store the triples in a temporary graph. This allows us to use the rdflib query engine for the sub-query instead of finding the matching pairs manually.
+            graph.add((cell, URIRef("http://ua.be/sdo2l/vocabulary/formalisms/tabular#holdsContent"), Literal(cell_value)))
+            graph.add((cell, URIRef("http://ua.be/sdo2l/vocabulary/formalisms/tabular#hasRowPosition"), Literal(int(row))))
+            graph.add((cell, URIRef("http://ua.be/sdo2l/vocabulary/formalisms/tabular#hasColumnPosition"), Literal(int(column))))
+            logging.debug(f"{cell_value=}")
+
+            try:
+                query_results = graph.query(query, initNs=graph_ns)
+            except Exception as e:
+                logging.error("Error executing the SPARQL query on the RDFLib Graph: " + str(e))
+                return JSONResponse(status_code=400, content={"message": "Error executing the SPARQL query on the RDFLib Graph."})
+            return await encode_graph_query_results(request, query_results)
+
+
+    def eval_custom_functions(self, ctx: QueryContext, part: CompValue) -> List[SPARQLResult]:
+        if part.name != "Extend":
+            raise NotImplementedError()
+
+        query_results = []
+        logging.debug("Custom evaluation.")
+        for eval_part in evalPart(ctx, part.p):
+            # Checks if the function is a URI (custom function)
+            if hasattr(part.expr, "iri"):
+                for conf_service in self.configuration.services:
+                    # Check if URI correspond to a registered custom function
+                    if part.expr.iri == URIRef(conf_service.namespace):
+                        query_results, ctx, part, eval_part = getattr(service, conf_service.call)(query_results, ctx, part, eval_part, conf_service)
+            else:
+                # For built-in SPARQL functions (that are not URIs)
+                evaluation: List[Any] = [_eval(part.expr, eval_part.forget(ctx, _except=part._vars))]
+                if isinstance(evaluation[0], SPARQLError):
+                    raise evaluation[0]
+                # Append results for built-in SPARQL functions
+                for result in evaluation:
+                    query_results.append(eval_part.merge({part.var: Literal(result)}))
+        return query_results

+ 0 - 1
spendpoint/service.py

@@ -19,7 +19,6 @@ class Outlier:
 
 
 def outlier_service(query_results, ctx, part, eval_part, service_configuration):
 def outlier_service(query_results, ctx, part, eval_part, service_configuration):
     """
     """
-
     Example query:
     Example query:
     PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
     PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
     PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
     PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>