Browse Source

creating the code to call services related to automated activities using
async http calls.

Lucas Albertins 1 year ago
parent
commit
af8096214d

+ 5 - 0
pom.xml

@@ -59,6 +59,11 @@
 			<artifactId>spring-boot-starter-test</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+        	<groupId>org.asynchttpclient</groupId>
+        	<artifactId>async-http-client</artifactId>
+        	<version>3.0.0.Beta2</version>
+        </dependency>
 	</dependencies>
 
 	<build>

+ 5 - 11
src/main/java/ua/be/wee/controller/EnactmentControllerMVC.java

@@ -40,7 +40,6 @@ import ua.be.wee.model.pt.StartTraceEvent;
 import ua.be.wee.model.pt.TraceArtifact;
 import ua.be.wee.model.repository.FusekiWrapper;
 import ua.be.wee.model.util.Pair;
-import ua.be.wee.service.FileStorageService;
 
 @Controller
 @RequestMapping("/")
@@ -54,9 +53,6 @@ public class EnactmentControllerMVC {
 	@Autowired
 	private Environment env;
 	
-	@Autowired
-	FileStorageService storageService;
-
     @Value("${base_url}")
     private String baseUrl;
 
@@ -168,6 +164,7 @@ public class EnactmentControllerMVC {
 			request.getSession().setAttribute("pm", pm);
 			request.getSession().setAttribute("trace", trace);
 			request.getSession().setAttribute("endacts",endActs);
+			request.getSession().setAttribute("automated",new ArrayList<AutomatedActivity>());
 			model.addAttribute("arts", null);
 			model.addAttribute("current", "1");
 	        return "enact";
@@ -279,17 +276,16 @@ public class EnactmentControllerMVC {
     	List<TraceArtifact> arts = (List<TraceArtifact>)request.getSession().getAttribute("arts");
     	if (element.getPort() != null) {
     		ControlInputPort port = (ControlInputPort)element.getPort();
-        	controller.addStartEvent(pt,port, port.getActivity(),arts);
-        	removeElement(acts,iri);
         	Activity activity = port.getActivity();
         	if (activity instanceof AutomatedActivity) {
 				AutomatedActivity aut = (AutomatedActivity)activity;
-				controller.callAutomatedActivity(aut, arts);
+				controller.callAutomatedActivity(pt,aut, arts);
 				automated.add(aut);
 			} else {
 				endacts.add(port.getActivity());
-	        	
 			}
+        	controller.addStartEvent(pt,port, port.getActivity(),arts);
+        	removeElement(acts,iri);
         	request.getSession().removeAttribute("arts");
         	return "enact";
 		} else if (element.getNode() != null) {
@@ -300,8 +296,6 @@ public class EnactmentControllerMVC {
 		} else {
 			return "error";
 		}
-    	
-    	
     }
     
     
@@ -332,7 +326,7 @@ public class EnactmentControllerMVC {
 					tArt.setLocation(part.getSubmittedFileName());
 					tArt.setRelatesTo(artifact);
 					traceArts.add(tArt);
-					storageService.save(part);
+					controller.uploadArtifact(part.getInputStream(),part.getSubmittedFileName());
 				}
 			}  
 		}

+ 2 - 1
src/main/java/ua/be/wee/controller/PMTrigger.java

@@ -1,5 +1,6 @@
 package ua.be.wee.controller;
 
+import ua.be.wee.model.nodes.AutomatedActivity;
 import ua.be.wee.model.nodes.FinalNode;
 import ua.be.wee.model.nodes.ports.ControlInputPort;
 
@@ -27,7 +28,7 @@ public class PMTrigger {
 	
 	public String getName() {
 		if (port != null) {
-			return port.getActivity().getName() + " VIA PORT: " + port.getName();
+			return (port.getActivity() instanceof AutomatedActivity ? "AUT: " + port.getActivity().getName() + " VIA PORT: " + port.getName() :  port.getActivity().getName() + " VIA PORT: " + port.getName());
 		} else {
 			return "END TRACE";
 		}

+ 36 - 2
src/main/java/ua/be/wee/model/EnactmentController.java

@@ -1,14 +1,21 @@
 package ua.be.wee.model;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.configurationprocessor.json.JSONException;
 import org.springframework.stereotype.Component;
 
 import ua.be.wee.model.nodes.Activity;
+import ua.be.wee.model.nodes.Artifact;
 import ua.be.wee.model.nodes.AutomatedActivity;
 import ua.be.wee.model.nodes.ports.ControlInputPort;
 import ua.be.wee.model.nodes.ports.ControlOutputPort;
+import ua.be.wee.model.nodes.ports.DataInputPort;
 import ua.be.wee.model.pm.PM;
 import ua.be.wee.model.pt.Event;
 import ua.be.wee.model.pt.PT;
@@ -16,7 +23,9 @@ import ua.be.wee.model.pt.TraceArtifact;
 import ua.be.wee.model.repository.NodeRespository;
 import ua.be.wee.model.repository.PMRepository;
 import ua.be.wee.model.repository.PTRepository;
+import ua.be.wee.model.util.AsyncHttpClientService;
 import ua.be.wee.model.util.Pair;
+import ua.be.wee.service.FileStorageService;
 
 @Component
 public class EnactmentController {
@@ -30,6 +39,9 @@ public class EnactmentController {
 	@Autowired
 	private PTRepository traceRepo;
 	
+	@Autowired
+	private FileStorageService storageService;
+	
 	public List<PM> getAllPMs() {
 		return pmRepo.getAllPMs();
 	}
@@ -67,8 +79,30 @@ public class EnactmentController {
 		
 	}
 
-	public void callAutomatedActivity(AutomatedActivity aut, List<TraceArtifact> arts) {
-		// TODO Auto-generated method stub
+	public void callAutomatedActivity(PT pt, AutomatedActivity aut, List<TraceArtifact> arts) throws JSONException, IOException {
+		Map<String,Pair<String,String>> params = createParamsMap(aut, arts);
+		AsyncHttpClientService.asyncHTTPClient(pt, aut, params,this);
 		
 	}
+
+	private Map<String,Pair<String,String>> createParamsMap(AutomatedActivity aut, List<TraceArtifact> arts) {
+		Map<String,Pair<String,String>> params = new HashMap<String, Pair<String,String>>();
+		List<DataInputPort> datalInPorts = aut.getDatalInPorts();
+		
+		for (DataInputPort dataInputPort : datalInPorts) {
+			for (TraceArtifact traceArtifact : arts) {
+				Artifact relatesTo = traceArtifact.getRelatesTo();
+				if (dataInputPort.getArtifact().getIri().equals(relatesTo.getIri())) {
+					traceArtifact.getLocation();
+					String content = storageService.load(traceArtifact.getLocation());
+					params.put(dataInputPort.getName(), new Pair<String, String>(traceArtifact.getFileExtension(), content));
+				}
+			}
+		}
+		return params;
+	}
+
+	public void uploadArtifact(InputStream is, String filename) throws IOException {
+		storageService.save(is,filename);	
+	}
 }

+ 5 - 5
src/main/java/ua/be/wee/model/repository/NodeRespository.java

@@ -42,7 +42,7 @@ public class NodeRespository {
 				+ "  	owl:sameAs <" + pm.getIri() + "> ;\n" 
 				+ "  	ob:hasObject ?node .\n" 
 				+ "  ?node a ?nodetype .\n"
-				+ "  FILTER (?nodetype in (pm:Initial, pm:Activity, pm:Final, pm:ForkJoin, pm:Artifact)) .      \n"
+				+ "  FILTER (?nodetype in (pm:Initial, pm:Activity, pm:AutomatedActivity, pm:Final, pm:ForkJoin, pm:Artifact)) .      \n"
 				+ "}";
 		ResultSet results = FusekiWrapper.getInstance().execQuery(query);
 		while (results.hasNext()) {
@@ -318,13 +318,13 @@ public class NodeRespository {
 				+ "PREFIX ftg: <http://ua.be/sdo2l/vocabulary/formalisms/ftg#>\n"
 				+ "PREFIX base: <http://ua.be/sdo2l/vocabulary/base/base#>\n"
 				+ "PREFIX pm: <http://ua.be/sdo2l/vocabulary/formalisms/pm#>\n" 
-				+ "SELECT ?act ?name ?type WHERE {\n"
+				+ "SELECT ?act ?name ?type ?timeout ?endpoint WHERE {\n"
 				+ "	?act owl:sameAs <" + iri + ">;\n" 
 				+ "   		pm:isTransformation ?t ;  \n"
-				+ "   		pm:hasName ?name .  \n" 
+				+ "   		pm:hasName ?name ;  \n" 
 				+ "   		pm:hasTimeout ?timeout .  \n"
 				+ " ?t base:hasGUID ?type ;  \n" 
-				+ "   		ftg:endpoint ?endpoint .  \n"
+				+ "   		ftg:hasEndpoint ?endpoint .  \n"
 				+ "}";
 		ResultSet results = FusekiWrapper.getInstance().execQuery(query);
 		if (results.hasNext()) {
@@ -337,7 +337,7 @@ public class NodeRespository {
 			act.setIri(iri);
 			act.setName(name.toString());
 			act.setTransformationName(type.toString());
-			act.setTimeout(Long.parseLong(timeout.toString()));
+			act.setTimeout(timeout.asLiteral().getLong());
 			act.setEndpoint(endpoint.toString());
 
 			act.setCtrlInPorts(getCtrlInPorts(act));

+ 156 - 0
src/main/java/ua/be/wee/model/util/AsyncHttpClientService.java

@@ -0,0 +1,156 @@
+package ua.be.wee.model.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.asynchttpclient.AsyncCompletionHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Dsl;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.boot.configurationprocessor.json.JSONObject;
+
+import ua.be.wee.model.EnactmentController;
+import ua.be.wee.model.nodes.AutomatedActivity;
+import ua.be.wee.model.nodes.ports.ControlOutputPort;
+import ua.be.wee.model.nodes.ports.DataOutputPort;
+import ua.be.wee.model.pt.PT;
+import ua.be.wee.model.pt.TraceArtifact;
+import ua.be.wee.service.FileStorageService;
+import ua.be.wee.service.FileStorageServiceImpl;
+
+public class AsyncHttpClientService {
+	
+	public static void asyncHTTPClient(PT pt, AutomatedActivity act, Map<String,Pair<String,String>> artifacts, EnactmentController control) throws JSONException, IOException {
+		DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config().setConnectTimeout(Duration.ofMillis(act.getTimeout()));
+		AsyncHttpClient client = Dsl.asyncHttpClient(clientBuilder);
+		
+		String requestBody = createJSONBody(artifacts);
+		
+		Request request = Dsl.post(act.getEndpoint()).setBody(requestBody).setRequestTimeout(Duration.ofMillis(act.getTimeout())).build();
+		
+		client.executeRequest(request, new AsyncCompletionHandler<String>() {
+			@Override
+		    public String onCompleted(Response response) throws Exception {
+				String result = response.getResponseBody();
+				int statusCode = response.getStatusCode();
+				System.out.println("Status Code: " + statusCode);
+				System.out.println("Body: " + result);
+				
+				if (statusCode == 200) {
+					JSONObject res = new JSONObject(result);
+					String outCtrl = res.getString("ctrl");
+					List<ControlOutputPort> ctrlOutPorts = act.getCtrlOutPorts();
+					ControlOutputPort ctrlOutPort = null;
+					for (ControlOutputPort controlOutputPort : ctrlOutPorts) {
+						if (controlOutputPort.getName().equals(outCtrl)) {
+							ctrlOutPort = controlOutputPort;
+							break;
+						}
+					}
+					if (ctrlOutPort == null) {
+						throw new Exception("The response of automated activity " + act.getName() + " did not return an output control port that matches one of the activity ports.");
+					}
+					
+					JSONObject outputs = res.getJSONObject("output");
+					List<TraceArtifact> traceArts = new ArrayList<TraceArtifact>();
+					for (Iterator iterator = outputs.keys(); iterator.hasNext();) {
+						String outDataName = (String) iterator.next();
+						List<DataOutputPort> dataOutPorts = act.getDataOutPorts();
+						
+						for (DataOutputPort dataOutputPort : dataOutPorts) {
+							if (outDataName.equals(dataOutputPort.getName())) {
+								JSONObject outData = outputs.getJSONObject(outDataName);
+								String type = outData.getString("type");
+								if (type.equals("inline")) {
+									String content = outData.getString("content");
+									String name = outData.getString("name");
+									InputStream is = new ByteArrayInputStream(content.getBytes());
+									control.uploadArtifact(is,name);
+									TraceArtifact tArt = new TraceArtifact();
+									tArt.setLocation(name);
+									tArt.setRelatesTo(dataOutputPort.getArtifact());
+									traceArts.add(tArt);
+								} else if (type.equals("url")) {
+									//TODO add possibility to link data to URL
+								}
+							}
+						}	
+					} 
+					Thread.sleep(1000);
+					control.addEndEvent(pt, traceArts, ctrlOutPort);
+					
+				} else {
+					throw new Exception("The response of automated activity " + act.getName() + " returned an unexpected status code: " + statusCode);
+				}
+				
+				
+				client.close();
+				return result;
+		    }
+			
+			@Override
+		    public void onThrowable(Throwable t) {
+				if (t instanceof TimeoutException) {
+					System.err.println("Timeout occurred!");
+				} else {
+					System.err.println("Another error occurred!");
+				}
+		    }
+		});
+		
+	}
+	
+	private static String createJSONBody(Map<String,Pair<String,String>> params) throws JSONException {
+		JSONObject body = new JSONObject();
+		JSONObject input = new JSONObject();
+		Set<String> keySet = params.keySet();
+		for (String dataPort : keySet) {
+			JSONObject inner = new JSONObject();
+			inner.put("type", "inline");
+			inner.put("content", params.get(dataPort).getSnd());
+			input.put(dataPort, inner);
+		}
+		body.put("input", input);
+		System.out.println(body.toString());
+		return body.toString();
+	}
+	
+	public static void main(String[] args) throws JSONException, IOException {
+		
+
+		FileStorageService storageService = new FileStorageServiceImpl();
+		storageService.setStorageURL("http://localhost:5000");
+				
+		AutomatedActivity a = new AutomatedActivity();
+		a.setTimeout(5000);
+		a.setEndpoint("http://localhost:7999");
+		
+		Map<String,Pair<String,String>> params = new HashMap<String, Pair<String,String>>();
+				
+		String content = storageService.load("mock_requirements.txt");
+		System.out.println(content);
+		params.put("din", new Pair<String, String>("txt", content+ "c"));
+		
+		EnactmentController enac = new EnactmentController();
+		
+		PT pt = new PT();
+		
+		asyncHTTPClient(pt, a, params,enac);
+		System.out.println("chegou aqui");
+
+	}
+
+
+}

+ 41 - 33
src/main/java/ua/be/wee/model/util/HTTPRequestClient.java

@@ -6,15 +6,18 @@ import java.net.http.HttpRequest;
 import java.net.http.HttpRequest.BodyPublisher;
 import java.net.http.HttpResponse;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
-import org.springframework.core.io.Resource;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.boot.configurationprocessor.json.JSONObject;
 
 import ua.be.wee.model.nodes.Artifact;
 import ua.be.wee.model.nodes.AutomatedActivity;
@@ -25,25 +28,26 @@ import ua.be.wee.service.FileStorageServiceImpl;
 
 public class HTTPRequestClient {
 
-	private static void asyncHTTPClient(AutomatedActivity act, List<TraceArtifact> artifacts) {
+	public static void asyncHTTPClient(AutomatedActivity act, Map<String,Pair<String,String>> artifacts) throws JSONException {
 		HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1)
 				.connectTimeout(Duration.ofMillis(act.getTimeout())).build();
 		
-		BodyPublisher requestBody = createJSONBody(act, artifacts);
+		BodyPublisher requestBody = createJSONBody(artifacts);
 		
 		HttpRequest request = HttpRequest.newBuilder().uri(URI.create(act.getEndpoint()))
 				.setHeader("User-Agent", "Java Async HTTPClient Example...").POST(requestBody).build();
-		CompletableFuture<HttpResponse<String>> asyncResponse = null;
+
+		
+		
 		// sendAsync(): Sends the given request asynchronously using this client with
 		// the given response body handler.
 		// Equivalent to: sendAsync(request, responseBodyHandler, null).
+		CompletableFuture<HttpResponse<String>> asyncResponse = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
 
-		
-
-		asyncResponse = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
 		String asyncResultBody = null;
 		int asyncResultStatusCode = 0;
 		try {
+			//asyncResponse.thenRun(() -> System.out.println());
 			asyncResultBody = asyncResponse.thenApply(HttpResponse::body).get(act.getTimeout(), TimeUnit.MILLISECONDS);
 
 			// OR:
@@ -56,7 +60,7 @@ public class HTTPRequestClient {
 			// this method throws an (unchecked) CompletionException with the underlying
 			// exception as its cause.
 
-			HttpResponse<String> response = asyncResponse.join();
+			//HttpResponse<String> response = asyncResponse.join();
 			asyncResultStatusCode = asyncResponse.thenApply(HttpResponse::statusCode).get(act.getTimeout(), TimeUnit.MILLISECONDS);
 		} catch (InterruptedException | ExecutionException | TimeoutException e) {
 			e.printStackTrace();
@@ -67,38 +71,42 @@ public class HTTPRequestClient {
 	}
 
 
-	private static BodyPublisher createJSONBody(AutomatedActivity act, List<TraceArtifact> artifacts) {
-		
-		
-		List<DataInputPort> datalInPorts = act.getDatalInPorts();
-		
-		for (DataInputPort dataInputPort : datalInPorts) {
-			for (TraceArtifact traceArtifact : artifacts) {
-				Artifact relatesTo = traceArtifact.getRelatesTo();
-				if (dataInputPort.getArtifact().getIri().equals(relatesTo.getIri())) {
-					traceArtifact.getLocation();
-				}
-			}
+
+	private static BodyPublisher createJSONBody(Map<String,Pair<String,String>> params) throws JSONException {
+		JSONObject body = new JSONObject();
+		JSONObject input = new JSONObject();
+		Set<String> keySet = params.keySet();
+		for (String dataPort : keySet) {
+			JSONObject inner = new JSONObject();
+			inner.put("type", "inline");
+			inner.put("content", params.get(dataPort).getSnd());
+			input.put(dataPort, inner);
 		}
-		
-		
-		return null;
+		body.put("input", input);
+		System.out.println(body.toString());
+		return HttpRequest.BodyPublishers.ofString(body.toString());
 	}
 
 
-	public static void main(String[] args) {
+	public static void main(String[] args) throws JSONException {
 		
 
 		FileStorageService storageService = new FileStorageServiceImpl();
 		storageService.setStorageURL("http://localhost:5000");
-		String load = storageService.load("trace.csv");
-		System.out.println(load);
+				
+		AutomatedActivity a = new AutomatedActivity();
+		a.setTimeout(5000);
+		a.setEndpoint("http://localhost:7999");
+		
+		Map<String,Pair<String,String>> params = new HashMap<String, Pair<String,String>>();
+				
+		String content = storageService.load("mock_requirements.txt");
+		System.out.println(content);
+		params.put("din", new Pair<String, String>("txt", content));
 		
-//		AutomatedActivity test = new AutomatedActivity();
-//		test.setEndpoint("https://crunchify.com/wp-content/java/crunchify-java-httpclient-tutorial.html");
-//		test.setTimeout(5000);
-//		List<TraceArtifact> arts = new ArrayList<TraceArtifact>();
-//		asyncHTTPClient(test,arts);
+		asyncHTTPClient(a, params);
+		System.out.println("chegou aqui");
+
 	}
 
 }

+ 2 - 5
src/main/java/ua/be/wee/service/FileStorageService.java

@@ -1,16 +1,13 @@
 package ua.be.wee.service;
 
+import java.io.InputStream;
 import java.nio.file.Path;
 import java.util.stream.Stream;
 
-import javax.servlet.http.Part;
-
-import org.springframework.core.io.Resource;
-
 public interface FileStorageService {
   public void init();
 
-  public void save(Part file);
+  public void save(InputStream file, String filename);
 
   public String load(String filename);
 

+ 3 - 10
src/main/java/ua/be/wee/service/FileStorageServiceImpl.java

@@ -1,7 +1,7 @@
 package ua.be.wee.service;
 
-import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
@@ -9,22 +9,16 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.stream.Stream;
 
-import javax.servlet.http.Part;
-
 import org.apache.http.HttpResponse;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.FileEntity;
 import org.apache.http.entity.InputStreamEntity;
-import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
-import org.springframework.core.io.Resource;
-import org.springframework.core.io.UrlResource;
 import org.springframework.stereotype.Service;
 import org.springframework.util.FileSystemUtils;
 
@@ -49,14 +43,13 @@ public class FileStorageServiceImpl implements FileStorageService {
 	}
 
 	@Override
-	public void save(Part file) {
+	public void save(InputStream file, String filename) {
 		try {
 			if (storageServiceURL == null) {
 				init();
 			}
 			//Files.copy(file.getInputStream(), this.root.resolve(file.getOriginalFilename()));
 			String url = "";
-			String filename = file.getSubmittedFileName();
 			switch (filename.substring(filename.lastIndexOf('.'))) {
 			case ".xopp":
 				url = storageServiceURL + "/files/xopp/"+filename;				
@@ -80,7 +73,7 @@ public class FileStorageServiceImpl implements FileStorageService {
 			// specify the PUT body to send to the server as part of the request
 			//httpPut.setEntity(new StringEntity(file.getInputStream().toString()));
 
-			httpPut.setEntity(new InputStreamEntity(file.getInputStream()));
+			httpPut.setEntity(new InputStreamEntity(file));
 			
 			
 			HttpResponse response = httpclient.execute(httpPut);