소스 검색

Create conversion service

Arkadiusz Ryś 2 년 전
부모
커밋
2383075958
6개의 변경된 파일56개의 추가작업 그리고 10개의 파일을 삭제
  1. 4 1
      requirements.txt
  2. 1 4
      spendpoint/endpoint.py
  3. 2 1
      spendpoint/main.py
  4. 31 2
      spendpoint/service.py
  5. 16 0
      tasks.py
  6. 2 2
      tests/test_query_endpoint.py

+ 4 - 1
requirements.txt

@@ -1,7 +1,9 @@
 # SpEndPoint
 arklog            ~= 0.5.1
 rdflib            ~= 6.2.0
-fastapi           ~= 0.92
+pandas            ~= 1.5.3
+fastapi           ~= 0.93.0
+pyarrow           ~= 11.0.0
 requests          ~= 2.28.2
 starlette         ~= 0.25.0
 python-magic      ~= 0.4.27
@@ -16,6 +18,7 @@ tox      ~= 4.4.6
 pip      ~= 23.0.1
 flit     ~= 3.8.0
 twine    ~= 4.0.2
+numpy    ~= 1.24.2
 invoke   ~= 2.0.0
 jinja2   ~= 3.1.2
 flake8   ~= 6.0.0

+ 1 - 4
spendpoint/endpoint.py

@@ -43,13 +43,11 @@ class SparqlEndpoint(FastAPI):
         return mime.split(",")[0] in ("text/turtle",)
 
     async def requested_result_type(self, request: Request, operation: str) -> str:
-        logging.debug("Getting mime type.")
         output_mime_type = request.headers["accept"]
         # TODO Ugly hack, fix later (Fuseki sends options)
         output_mime_type = output_mime_type.split(",")[0]
         if isinstance(output_mime_type, list):
             return output_mime_type[0]
-
         # TODO Use match or dict for this
         if not output_mime_type:
             logging.warning("No mime type provided. Setting mimetype to 'application/xml'.")
@@ -127,12 +125,11 @@ class SparqlEndpoint(FastAPI):
                 query_results = self.graph.query(query, initNs=graph_ns)
             except Exception as e:
                 logging.error("Error executing the SPARQL query on the RDFLib Graph: " + str(e))
+                # TODO Send better error which can be parsed as a SPARQL response or check it client side
                 return JSONResponse(
                     status_code=400,
                     content={"message": "Error executing the SPARQL query on the RDFLib Graph"},
                 )
-
-            logging.debug(f"{type(query_results)=}")
             output_mime_type = await self.requested_result_type(request, query_operation)
             logging.debug(f"Returning {output_mime_type}.")
             try:

+ 2 - 1
spendpoint/main.py

@@ -1,7 +1,7 @@
 import arklog
 from spendpoint.endpoint import SparqlEndpoint
 from spendpoint import __version__
-from spendpoint.service import outlier_service, example_service
+from spendpoint.service import outlier_service, example_service, conversion_service
 
 arklog.set_config_logging()
 
@@ -10,6 +10,7 @@ app = SparqlEndpoint(
     functions = {
         "https://ontology.rys.app/dt/function/outlier": outlier_service,
         "https://ontology.rys.app/dt/function/example": example_service,
+        "https://ontology.rys.app/dt/function/conversion": conversion_service,
     },
     title = "SPARQL endpoint for storage and services",
     description = "/n".join(("SPARQL endpoint.",))

+ 31 - 2
spendpoint/service.py

@@ -1,10 +1,13 @@
 import logging
+from pathlib import Path
+
 import arklog
 import rdflib
-from rdflib import Literal
+import pandas as pd
+from rdflib import Literal, XSD
 from rdflib.plugins.sparql.evalutils import _eval
 from dataclasses import dataclass
-
+from timeit import default_timer as timer
 from spendpoint.bridge import fetch_outliers
 arklog.set_config_logging()
 
@@ -53,6 +56,32 @@ def outlier_service(query_results, ctx, part, eval_part):
     return query_results, ctx, part, eval_part
 
 
+def conversion_service(query_results, ctx, part, eval_part):
+    """"""
+    logging.debug(f"Conversion service.")
+    input_file_name = str(_eval(part.expr.expr[0], eval_part.forget(ctx, _except=part.expr._vars)))
+    output_file_name = str(_eval(part.expr.expr[1], eval_part.forget(ctx, _except=part.expr._vars)))
+    data_dir = Path(__file__).resolve().parent.parent / Path("data")
+    input_file_path = data_dir / Path(input_file_name)
+    output_file_path = data_dir / Path(output_file_name)
+    success = False
+    start_time = timer()
+    if input_file_path.suffix.endswith("csv") and output_file_path.suffix.endswith("parquet"):
+        df = pd.read_csv(input_file_path)
+        df.to_parquet(output_file_path)
+        success = True
+    end_time = timer()
+    query_results.append(eval_part.merge({
+        part.var: Literal(""),
+        rdflib.term.Variable(part.var + "_input") : Literal(input_file_name),
+        rdflib.term.Variable(part.var + "_output") : Literal(output_file_name),
+        rdflib.term.Variable(part.var + "_duration") : Literal(end_time - start_time, datatype=XSD.duration),
+        rdflib.term.Variable(part.var + "_success") : Literal(success),
+    }))
+    return query_results, ctx, part, eval_part
+
+
+
 def example_service(query_results, ctx, part, eval_part):
     """"""
     logging.debug(f"{query_results=}")

+ 16 - 0
tasks.py

@@ -71,3 +71,19 @@ def release(c, version):
     c.run(f"git push")
     c.run(f"git branch -d release-{_major}.{_minor}.{_patch}")
     c.run(f"git push origin --tags")
+
+
+@task(name="generate", aliases=("gen", "csv"))
+def generate_random_data_csv(c, rows=200000, columns=50):
+    """"""
+    import numpy as np
+    import uuid
+    data_dir = Path(__file__).resolve().parent / Path("data")
+    out_file_path = data_dir / Path("example.csv")
+    chunk = 1000
+    current_row = 0
+    with out_file_path.open("w", encoding="utf-8", buffering=chunk) as csv_file:
+        while current_row < rows:
+            data = [[uuid.uuid4() for i in range(chunk)], np.random.random(chunk) * 100, np.random.random(chunk) * 50, *[np.random.randint(1000, size=(chunk,)) for x in range(columns - 3)]]
+            csv_file.writelines([('%s,%.6f,%.6f,%i' + (',%i' * (columns - 4)) + '\n') % row for row in zip(*data)])
+            current_row += chunk

+ 2 - 2
tests/test_query_endpoint.py

@@ -8,8 +8,8 @@ arklog.set_config_logging()
 prefixes = "\n".join((
     "PREFIX dtf:  <https://ontology.rys.app/dt/function/>",
     "PREFIX owl:  <http://www.w3.org/2002/07/owl#>",
-    "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>",
-    "PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>",
+    "PREFIX rdf:  <http://www.w3.org/1999/02/22-rdf-syntax-ns#>",
+    "PREFIX xsd:  <http://www.w3.org/2001/XMLSchema#>",
     "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>",
 ))