Browse Source

Implemented better PM enactment: with spawned tasks

Yentl Van Tendeloo 6 years ago
parent
commit
d65caf2848
1 changed files with 116 additions and 76 deletions
  1. 116 76
      bootstrap/core_algorithm.alc

+ 116 - 76
bootstrap/core_algorithm.alc

@@ -20,6 +20,7 @@ String core_model_location = "models/core"
 Element core = ?
 String current_user_id
 Element caches
+Element pm_tasks = ?
 
 String function get_foldername(name : String):
 	Element result
@@ -465,20 +466,6 @@ Boolean function check_conformance(model_id : String):
 
 	return True!
 
-Boolean function pm_finished(worklist : Element, pm : String):
-	Element finished
-	Integer cnt
-	Integer i
-	// Check if any of the "finish" elements are in the worklist
-	// If so, we can already finish, and therefore will stop immediately
-	finished = allInstances(pm, "Finish")
-	worklist = set_copy(worklist)
-	while (set_len(worklist) > 0):
-		// Check each finished element individually
-		if (set_in(finished, list_read(set_pop(worklist), 0))):
-			return True!
-	return False!
-
 Element function merge_models(models_dict : Element, operation_name : String):
 	core = import_node(core_model_location)
 	// 0) Find operation signature
@@ -861,6 +848,14 @@ Element function PM_signature(pm : Element):
 
 	return result!
 
+Void function enact_PM_activity(activity_to_task : Element, task_to_result : Element, pm : Element, element : String, mapping : Element):
+	Boolean result
+	pm_tasks = activity_to_task
+	set_add(activity_to_task[element], get_taskname())
+	result = enact_action(pm, element, mapping)
+	dict_add_fast(task_to_result, get_taskname(), result)
+	return!
+
 Void function enact_PM(pm : Element, mapping : Element):
 	Element worklist
 	String element
@@ -869,6 +864,7 @@ Void function enact_PM(pm : Element, mapping : Element):
 	Element tuple
 	Element counters
 	Element join_nodes
+	Element exec_nodes
 	Element keys
 	String key
 
@@ -897,76 +893,120 @@ Void function enact_PM(pm : Element, mapping : Element):
 	while (set_len(join_nodes) > 0):
 		dict_add(counters, set_pop(join_nodes), 0)
 
+	// Initialize activity to task dictionary with empty sets
+	Element activity_to_task
+	Element task_to_activity
+	Element task_to_result
+
+	activity_to_task = dict_create()
+	task_to_activity = dict_create()
+	task_to_result = dict_create()
+	exec_nodes = allInstances(pm, "Exec")
+	while (set_len(exec_nodes) > 0):
+		dict_add(activity_to_task, set_pop(exec_nodes), set_create())
+
 	// Create the worklist with the Start instance as first element
 	worklist = set_create()
-	set_add_node(worklist, create_tuple(set_pop(allInstances(pm, "Start")), True))
-
-	while (bool_not(pm_finished(worklist, pm))):
-		// Pop a random element from the list and execute it
-		tuple = set_pop(worklist)
-		element = tuple[0]
-		result = tuple[1]
-
-		// Find the type (to see what to do with it)
-		//   this does not yet yield the type of transformation, if it is an Execution
-		type = read_type(pm, element)
-
-		if (type == "Start"):
-			// Initial node, just progress to the next elements
-			// Nothing to do here though, as we have just started
-			result = True
-		elif (type == "Finish"):
-			// Should be impossible, as we would have ended...
-			result = result
-		elif (type == "Fork"):
-			result = result
-		elif (type == "Join"):
-			// Only do this if all dependencies are fullfilled
-			// So add to the counter of this Join
-			dict_overwrite(counters, element, integer_addition(counters[element], 1))
-
-			// Now check whether we have enough tokens to execute the Join itself
-			Integer required
-			Integer got
-			required = set_len(allIncomingAssociationInstances(pm, element, "Next")) + set_len(allIncomingAssociationInstances(pm, element, "Else"))
-			got = counters[element]
-			if (got == required):
-				// Reset counter to 0
-				dict_overwrite(counters, element, 0)
-
-				// And continue
-			else:
-				// We haven't gotten all yet, so we wait (i.e., continue without adding Next link to worklist)
-				continue!
+	set_add(worklist, set_pop(allInstances(pm, "Start")))
 
-		elif (type == "Exec"):
-			// Execute a transformation
-			// This the difficult part!
+	// Keep on iterating until we reach finish
+	while (True):
+		// Check if there are PM elements to expand
+		if (set_len(worklist) > 0):
+			// There is something we can do, so do it!
+			element = set_pop(worklist)
 
-			result = enact_action(pm, element, mapping)
-			output("Success")
+			// Find the type (to see what to do with it)
+			//   this does not yet yield the type of transformation, if it is an Execution
+			type = read_type(pm, element)
 
-		elif (type == "Decision"):
-			// If the previous result is true, we add the normal one, otherwise the false one
-			// in this context, that means that if it is false, we should add it manually to the list, and then continue the simulation loop
-			if (bool_not(result)):
-				// Apparently it is False, so map this to the "Else" branch
-				set_add_node(worklist, create_tuple(set_pop(allAssociationDestinations(pm, element, "Else")), True))
-				continue!
-			else:
-				// Apparently it is True, so map this to the "Then" branch
-				set_add_node(worklist, create_tuple(set_pop(allAssociationDestinations(pm, element, "Then")), True))
+			// Some types have nothing to do, such as start and fork
+			// Therefore, they are not mentioned in the following conditional
+
+			if (type == "Finish"):
+				// We have finished, so terminate
+				break!
+			elif (type == "Join"):
+				// Only do this if all dependencies are fullfilled
+				// So add to the counter of this Join
+				dict_overwrite(counters, element, integer_addition(counters[element], 1))
+
+				// Now check whether we have enough tokens to execute the Join itself
+				Integer required
+				Integer got
+				required = set_len(allIncomingAssociationInstances(pm, element, "Next")) + set_len(allIncomingAssociationInstances(pm, element, "Else"))
+				got = counters[element]
+				if (got == required):
+					// Reset counter to 0
+					dict_overwrite(counters, element, 0)
+
+					// And continue
+				else:
+					// We haven't gotten all yet, so we wait (i.e., continue without adding Next link to worklist)
+					continue!
+
+			elif (type == "Exec"):
+				// Execute a transformation
+				// This the difficult part!
+
+				//result = enact_action(pm, element, mapping)
+				//output("Success")
+
+				Element args
+				String taskname
+				args = list_create()
+				list_append(args, activity_to_task)
+				list_append(args, task_to_result)
+				list_append(args, pm)
+				list_append(args, element)
+				list_append(args, mapping)
+				taskname = spawn(enact_PM_activity, args)
+				output("Spawned task for activity: " + taskname)
+				dict_add(task_to_activity, taskname, element)
 				continue!
 
-		// We have finished the execution, so add all outgoing edges to the worklist
-		Element all_next
-		all_next = allAssociationDestinations(pm, element, "Next")
-		String next
-		while (set_len(all_next) > 0):
-			next = set_pop(all_next)
-			set_add_node(worklist, create_tuple(next, result))
+			// We have finished the execution, so add all outgoing edges to the worklist
+			Element all_next
+			all_next = allAssociationDestinations(pm, element, "Next")
+			String next
+			while (set_len(all_next) > 0):
+				next = set_pop(all_next)
+				set_add(worklist, next)
+		else:
+			// No new tasks to spawn, so go and check on all running activities
+			if (dict_len(task_to_result) > 0):
+				// There are outputs!
+				Element keys
+				String task
+
+				keys = dict_keys(task_to_result)
+				while (set_len(keys) > 0):
+					task = set_pop(keys)
+					output("Success: " + task)
+					result = task_to_result[task]
+
+					Element all_next
+					all_next = allAssociationDestinations(pm, task_to_activity[task], "Next")
+					String next
+					while (set_len(all_next) > 0):
+						next = set_pop(all_next)
+
+						if (read_type(pm, next) == "Decision"):
+							// Got decision node, so expand immediately
+							if (result):
+								set_add(worklist, set_pop(allAssociationDestinations(pm, next, "Then")))
+							else:
+								set_add(worklist, set_pop(allAssociationDestinations(pm, next, "Else")))
+						else:
+							// Other node, so just append for further processing
+							set_add(worklist, next)
 
-	// Reached a finish element, so stop
+					set_remove(activity_to_task[task_to_activity[task]], task)
+					dict_delete(task_to_result, task)
+					dict_delete(task_to_activity, task)
+			else:
+				// No finished activities either, so we sleep for some time
+				sleep(0.1)
 
 	// Remove all mock locations again
 	while (set_len(mock_locations) > 0):