Browse Source

Parallelized + split vis/sim

rparedis 2 years ago
parent
commit
e10eae3f5c

+ 3 - 0
2023.09.07-meeting-Havenhuis.txt

@@ -0,0 +1,3 @@
+more tugboats available then crew availability for the tugboats. So 17 boats available, but 11 crews.
+
+between 6 and 8: crew changes

BIN
de2/__pycache__/elements.cpython-38.pyc


BIN
de2/__pycache__/routing.cpython-38.pyc


BIN
de2/__pycache__/tracer.cpython-38.pyc


+ 93 - 88
de2/elements.py

@@ -53,6 +53,11 @@ class Vessel:
 	name: str = ""
 	category: str = "weak"
 
+	def tuple(self):
+		return self.mmsi, self.name,\
+		       self.source, self.target,\
+		       self.task, self.total_distance, self.distance_left, self.velocity
+
 
 class Pool(AtomicDEVS):
 	def __init__(self, name):
@@ -71,63 +76,63 @@ class Pool(AtomicDEVS):
 	def extTransition(self, inputs):
 		self.state["time"] += self.elapsed
 		if self.req_in in inputs:
-			request = inputs[self.req_in]
-			# TODO: prefill with all vessels of the simulation?
-			if request["mmsi"] in self.state["waiting"]:
-				vessel = self.state["waiting"][request["mmsi"]]
-			else:
-				print("VESSEL %s DOES NOT EXIST IN POOL" % str(request["mmsi"]))
-				# print("\t", request)
-				vessel = Vessel(request["mmsi"])
-
-			vessel.footprint.idle_time += self.state["time"] - vessel.footprint.idle_since
-
-			# vessel.time_until_departure = (request["end"] - request["start"]) / 1000
-			# vessel.velocity = request["distance"] / vessel.time_until_departure
-			# vessel.distance_left = request["distance"]
-			# vessel.total_distance = request["distance"]
-			# vessel.source = (request["source_lon"], request["source_lat"])
-			# vessel.target = (request["target_lon"], request["target_lat"])
-			# vessel.location = request["location"]
-
-			vessel.velocity = request["velocity"]
-			if vessel.velocity > 0:
-				# requests for idle times should be ignored!
-				vessel.distance_left = request["distance"]
-				vessel.total_distance = vessel.distance_left
-				vessel.time_until_departure = vessel.distance_left / vessel.velocity
-				vessel.source = request["source_lon"], request["source_lat"]
-				vessel.target = request["target_lon"], request["target_lat"]
-				vessel.task = request["task"]
-
-				vessel.name = request["name"]
-				vessel.category = request["category"]
-
-				self.state["should_exit"].append(vessel)
-				if vessel.mmsi in self.state["waiting"]:
-					del self.state["waiting"][vessel.mmsi]
-		if self.vessel_in in inputs:
-			vessel = inputs[self.vessel_in]
-			if vessel.mmsi in self.state["waiting"]:
-				raise ValueError("Cannot create duplicate vessels (loc = %s; mmsi = %s; d = %f)" %
-				                 (str(vessel.location), str(vessel.mmsi), vessel.total_distance))
-			else:
-				vessel.source = vessel.target
-				vessel.target = None
+			for request in inputs[self.req_in]:
+				# TODO: prefill with all vessels of the simulation?
+				if request["mmsi"] in self.state["waiting"]:
+					vessel = self.state["waiting"][request["mmsi"]]
+				else:
+					print("VESSEL %s DOES NOT EXIST IN POOL" % str(request["mmsi"]))
+					# print("\t", request)
+					vessel = Vessel(request["mmsi"])
+
+				vessel.footprint.idle_time += self.state["time"] - vessel.footprint.idle_since
+
+				# vessel.time_until_departure = (request["end"] - request["start"]) / 1000
+				# vessel.velocity = request["distance"] / vessel.time_until_departure
+				# vessel.distance_left = request["distance"]
+				# vessel.total_distance = request["distance"]
+				# vessel.source = (request["source_lon"], request["source_lat"])
+				# vessel.target = (request["target_lon"], request["target_lat"])
+				# vessel.location = request["location"]
+
+				vessel.velocity = request["velocity"]
 				if vessel.velocity > 0:
-					time = vessel.total_distance / vessel.velocity
-					# vessel.footprint.fuel_usage += Footprint.get_fuel(vessel.total_distance, vessel.velocity)
-					vessel.footprint.usage_time += time
-					if vessel.task == "sailing":
-						vessel.footprint.sailing_time += time
-					else:
-						vessel.footprint.tugging_time += time
-					vessel.footprint.velocity_total += vessel.velocity
-					vessel.footprint.tasks += 1
-
-				vessel.footprint.idle_since = self.state["time"]
-
-				self.state["waiting"][vessel.mmsi] = vessel
+					# requests for idle times should be ignored!
+					vessel.distance_left = request["distance"]
+					vessel.total_distance = vessel.distance_left
+					vessel.time_until_departure = vessel.distance_left / vessel.velocity
+					vessel.source = request["source_lon"], request["source_lat"]
+					vessel.target = request["target_lon"], request["target_lat"]
+					vessel.task = request["task"]
+
+					vessel.name = request["name"]
+					vessel.category = request["category"]
+
+					self.state["should_exit"].append(vessel)
+					if vessel.mmsi in self.state["waiting"]:
+						del self.state["waiting"][vessel.mmsi]
+		if self.vessel_in in inputs:
+			for vessel in inputs[self.vessel_in]:
+				if vessel.mmsi in self.state["waiting"]:
+					raise ValueError("Cannot create duplicate vessels (loc = %s; mmsi = %s; d = %f)" %
+					                 (str(vessel.location), str(vessel.mmsi), vessel.total_distance))
+				else:
+					vessel.source = vessel.target
+					vessel.target = None
+					if vessel.velocity > 0:
+						time = vessel.total_distance / vessel.velocity
+						# vessel.footprint.fuel_usage += Footprint.get_fuel(vessel.total_distance, vessel.velocity)
+						vessel.footprint.usage_time += time
+						if vessel.task == "sailing":
+							vessel.footprint.sailing_time += time
+						else:
+							vessel.footprint.tugging_time += time
+						vessel.footprint.velocity_total += vessel.velocity
+						vessel.footprint.tasks += 1
+
+					vessel.footprint.idle_since = self.state["time"]
+
+					self.state["waiting"][vessel.mmsi] = vessel
 		return self.state
 
 	def timeAdvance(self):
@@ -138,7 +143,7 @@ class Pool(AtomicDEVS):
 	def outputFnc(self):
 		if len(self.state["should_exit"]) > 0:
 			return {
-				self.vessel_out: self.state["should_exit"][0]
+				self.vessel_out: [self.state["should_exit"][0]]
 			}
 		return {}
 
@@ -165,8 +170,8 @@ class Sailer(AtomicDEVS):
 		self.state["time"] += self.elapsed
 		self.update_vessels(self.elapsed)
 		if self.vessel_in in inputs:
-			vessel = inputs[self.vessel_in]
-			self.state["vessels"].append(vessel)
+			for vessel in inputs[self.vessel_in]:
+				self.state["vessels"].append(vessel)
 		self.state["vessels"].sort(key=lambda v: v.time_until_departure)
 		return self.state
 
@@ -185,7 +190,7 @@ class Sailer(AtomicDEVS):
 	def outputFnc(self):
 		if len(self.state["vessels"]) > 0:
 			return {
-				self.vessel_out: self.state["vessels"][0]
+				self.vessel_out: [self.state["vessels"][0]]
 			}
 		return {}
 
@@ -235,7 +240,7 @@ class Tracer(AtomicDEVS):
 	def outputFnc(self):
 		if self.state["index"] < len(self.ivef):
 			return {
-				self.reqs: self.ivef.iloc[self.state["index"]]
+				self.reqs: [self.ivef.iloc[self.state["index"]]]
 			}
 		return {}
 
@@ -244,7 +249,7 @@ class Scheduler(AtomicDEVS):
 	def __init__(self, name, ivef):
 		super(Scheduler, self).__init__(name)
 
-		self.ivef = pd.read_csv(ivef, usecols=["mmsi", "start", "ETA", "source", "target", "task"])
+		self.ivef = pd.read_csv(ivef, usecols=["mmsi", "start", "ETA", "source", "target", "task"], dtype={"mmsi": str})
 		self.ivef.sort_values(by=["start"])
 		self.starting_time = self.ivef.iloc[0]["start"]
 		# self.ivef["start"] -= self.starting_time
@@ -271,64 +276,64 @@ class Scheduler(AtomicDEVS):
 	def outputFnc(self):
 		if self.state["index"] < len(self.ivef):
 			return {
-				self.reqs: self.ivef.iloc[self.state["index"]]
+				self.reqs: [self.ivef.iloc[self.state["index"]]]
 			}
 		return {}
 
 
-class Planner(AtomicDEVS):
+class RoutePlanner(AtomicDEVS):
 	def __init__(self, name):
-		super(Planner, self).__init__(name)
+		super(RoutePlanner, self).__init__(name)
 
 		self.graph = get_graph()
 		self.tugs = pd.read_excel("20230405_Tugs.xlsx", dtype={"MMSI": str, "NAME": str, "category": str})
 
 		self.state = {
-			"request": None
+			"request": []
 		}
 
 		self.req_in = self.addInPort("req_in")
 		self.req_out = self.addOutPort("req_out")
 
 	def timeAdvance(self):
-		if self.state["request"] is None:
+		if len(self.state["request"]) == 0:
 			return INFINITY
 		return 0.0
 
 	def extTransition(self, inputs):
 		if self.req_in in inputs:
-			request = inputs[self.req_in]
-
-			tug = self.tugs[self.tugs["MMSI"] == str(request["mmsi"])].iloc[0]
-			request["name"] = tug["NAME"]
-			request["category"] = tug["category"]
+			for request in inputs[self.req_in]:
+				tug = self.tugs[self.tugs["MMSI"] == str(request["mmsi"])].iloc[0]
+				request["name"] = tug["NAME"]
+				request["category"] = tug["category"]
 
-			request["source_lon"], request["source_lat"] = eval(request["source"])
-			request["target_lon"], request["target_lat"] = eval(request["target"])
+				request["source_lon"], request["source_lat"] = eval(request["source"])
+				request["target_lon"], request["target_lat"] = eval(request["target"])
 
-			src_vertex, _ = get_closest_vertex(self.graph, request["source_lon"], request["source_lat"])
-			tgt_vertex, _ = get_closest_vertex(self.graph, request["target_lon"], request["target_lat"])
+				src_vertex, _ = get_closest_vertex(self.graph, request["source_lon"], request["source_lat"])
+				tgt_vertex, _ = get_closest_vertex(self.graph, request["target_lon"], request["target_lat"])
 
-			request["distance"] = self.graph.distances([src_vertex.index], [tgt_vertex.index], weights="distance", mode="all")[0][0]
+				request["distance"] = self.graph.distances([src_vertex.index], [tgt_vertex.index], weights="distance", mode="all")[0][0]
 
-			# TODO: check this value with PoAB
-			# TODO: make this a distribution instead?
-			# request["velocity"] = 6.17  # about 12 knots
-			request["velocity"] = request["distance"] / ((request["ETA"] - request["start"]) / 1000)
+				# TODO: check this value with PoAB
+				# TODO: make this a distribution instead?
+				# request["velocity"] = 6.17  # about 12 knots
+				request["velocity"] = request["distance"] / ((request["ETA"] - request["start"]) / 1000)
 
-			# UNUSED!
-			# request["end"] = request["start"] + request["distance"] / request["velocity"]
+				# UNUSED!
+				# request["end"] = request["start"] + request["distance"] / request["velocity"]
 
-			self.state["request"] = request
+				self.state["request"].append(request)
 		return self.state
 
 	def outputFnc(self):
-		if self.state["request"] is None:
+		if len(self.state["request"]) == 0:
 			return {}
-		return { self.req_out: self.state["request"] }
+		return { self.req_out: [self.state["request"][0]] }
 
 	def intTransition(self):
-		self.state["request"] = None
+		if len(self.state["request"]) > 0:
+			self.state["request"].pop(0)
 		return self.state
 
 
@@ -354,7 +359,7 @@ class Port(CoupledDEVS):
 
 		self.clock = self.addSubModel(Clock("clock"))
 		self.scheduler = self.addSubModel(Scheduler("scheduler", ivef))
-		self.planner = self.addSubModel(Planner("planner"))
+		self.planner = self.addSubModel(RoutePlanner("planner"))
 		# self.tracer = self.addSubModel(Tracer("tracer", ivef))
 		self.pool = self.addSubModel(Pool("pool"))
 		self.sailer = self.addSubModel(Sailer("sailer"))

+ 26 - 1
de2/routing.py

@@ -161,4 +161,29 @@ def pathfinder(graph, source, target):
 
 if __name__ == '__main__':
     graph = get_graph()
-    graph.write_picklez("paths/cache.pickle.gz")
+    # graph.write_picklez("paths/cache.pickle.gz")
+
+    mapper = {}
+    with open("paths/endpointsWGS84.json", 'r') as file:
+        features = json.load(file)["features"]
+    for feature in features:
+        mapper[feature["attributes"]["NUMMER"]] = (feature["geometry"]["x"], feature["geometry"]["y"])
+
+    import pandas as pd
+
+    pf = pathfinder(graph, mapper["355"], mapper["602"])
+
+    print(pf, sum(pf[1]))
+
+    # berths = pd.read_csv("berths.csv", index_col=0)
+    # # matrix = pd.DataFrame(columns=berths["lpl_id"].tolist(), index=berths["lpl_id"].tolist())
+    #
+    # bths = list(set(berths["lpl_id"]))
+    # vtx = [get_closest_vertex(graph, *mapper[x])[0] for x in bths if x in mapper]
+    # # vtx = list(set(vtx))
+    #
+    # distance = graph.distances(vtx, vtx, mode="all")
+    #
+    # cols = [x for x in bths if x in mapper]
+    # matrix = pd.DataFrame(data=distance, columns=cols, index=cols)
+    # matrix.to_csv("matrix.csv")

+ 129 - 0
de2/tracer.py

@@ -0,0 +1,129 @@
+from pypdevs.tracers.tracerBase import BaseTracer
+from pypdevs.util import runTraceAtController
+import sys
+from dataclasses import dataclass
+
+from de2.elements import Pool, Sailer
+
+
+# TODO: Update always!
+class TracerPort(BaseTracer):
+    """
+    A tracer for port simulation output
+    """
+    def __init__(self, uid, server, filename):
+        """
+        Constructor
+
+        :param uid: the UID of this tracer
+        :param server: the server to make remote calls on
+        :param filename: file to save the trace to, can be None for output to stdout
+        """
+        super(TracerPort, self).__init__(uid, server)
+        if server.getName() == 0:
+            self.filename = filename
+        else:
+            self.filename = None
+        self.prevtime = (-1, -1)
+
+    def startTracer(self, recover):
+        """
+        Starts up the tracer
+
+        :param recover: whether or not this is a recovery call (so whether or not the file should be appended to)
+        """
+        if self.filename is None:
+            self.verb_file = sys.stdout
+        elif isinstance(self.filename, str):
+            if recover:
+                self.verb_file = open(self.filename, 'a+')
+            else:
+                self.verb_file = open(self.filename, 'w')
+        else:
+            self.verb_file = self.filename
+
+    def stopTracer(self):
+        """
+        Stop the tracer
+        """
+        self.verb_file.flush()
+
+    def trace(self, time, text):
+        """
+        Actual tracing function
+
+        :param time: time at which this trace happened
+        :param text: the text that was traced
+        """
+        string = ""
+        if time > self.prevtime:
+            string = "TIME: %10.6f\n" % time[0]
+            self.prevtime = time
+        string += text + "\n"
+        self.verb_file.write(string)
+
+    def _vesselStr(self, vessel):
+        # VESSEL: mmsi, name, source, target, task, total distance, distance left, velocity
+        return "'%s', '%s', %s, %s, '%s', %.6f, %.6f, %.6f" % vessel.tuple()
+
+    def traceInternal(self, aDEVS):
+        """
+        Tracing done for the internal transition function
+
+        :param aDEVS: the model that transitioned
+        """
+        if isinstance(aDEVS, Pool):
+            for vessel in aDEVS.state["waiting"].values():
+                runTraceAtController(self.server, self.uid, aDEVS,
+                                     [aDEVS.time_last, '"' + self._vesselStr(vessel) + '"'])
+        elif isinstance(aDEVS, Sailer):
+            for vessel in aDEVS.state["vessels"]:
+                runTraceAtController(self.server, self.uid, aDEVS,
+                                     [aDEVS.time_last, '"' + self._vesselStr(vessel) + '"'])
+
+    def traceExternal(self, aDEVS):
+        """
+        Tracing done for the external transition function
+
+        :param aDEVS: the model that transitioned
+        """
+        if isinstance(aDEVS, Pool):
+            for vessel in aDEVS.state["waiting"].values():
+                runTraceAtController(self.server, self.uid, aDEVS,
+                                     [aDEVS.time_last, '"' + self._vesselStr(vessel) + '"'])
+        elif isinstance(aDEVS, Sailer):
+            for vessel in aDEVS.state["vessels"]:
+                runTraceAtController(self.server, self.uid, aDEVS,
+                                     [aDEVS.time_last, '"' + self._vesselStr(vessel) + '"'])
+
+
+@dataclass
+class StreamedVessel:
+    mmsi: str
+    name: str
+    source: tuple
+    target: tuple
+    task: str
+    total_distance: float
+    distance_left: float
+    velocity: float
+
+
+class Streamer:
+    def __init__(self):
+        self.time = 0
+        self.starting_time = None
+        self.vessels = []
+
+    def write(self, text: str):
+        texts = text.split("\n")
+        for t in texts:
+            if t.startswith("TIME: "):
+                self.time = float(t[6:])
+                if self.starting_time is None:
+                    self.starting_time = self.time
+                self.vessels.clear()
+            elif len(t) > 0:
+                self.vessels.append(StreamedVessel(*eval(t)))
+
+    def flush(self): pass

File diff suppressed because it is too large
+ 39 - 13
mapper.py


+ 1 - 0
results-de2/plan.csv

@@ -391,3 +391,4 @@ mmsi,start,ETA,source,target,task
 205257290,1643642917825,1643643517824,"(4.351315757000066, 51.26427078800003)","(4.349825623000072, 51.265297763000035)",tugging
 205257290,1643643517824,1643673577824,"(4.349825623000072, 51.265297763000035)","(4.34999, 51.26505)",tugging
 205203390,1643647324324,1643673541824,"(4.320220451000068, 51.31285972400008)","(4.37787, 51.25886)",tugging
+,,,,,