| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- from pypdevs.tracers.tracerBase import BaseTracer
- from pypdevs.util import runTraceAtController
- from dataclasses import dataclass
- import sys
- from de2.elements import Pool, Sailer, Clock, Scheduler
- class TracerPort(BaseTracer):
- """
- A tracer for port simulation output
- """
- def __init__(self, uid, server, filename):
- """
- Constructor
- :param uid: the UID of this tracer
- :param server: the server to make remote calls on
- :param filename: file to save the trace to, can be None for output to stdout
- """
- super(TracerPort, self).__init__(uid, server)
- if server.getName() == 0:
- self.filename = filename
- else:
- self.filename = None
- self.prevtime = (-1, -1)
- self.starting_time = 0
- def startTracer(self, recover):
- """
- Starts up the tracer
- :param recover: whether or not this is a recovery call (so whether or not the file should be appended to)
- """
- if self.filename is None:
- self.verb_file = sys.stdout
- elif isinstance(self.filename, str):
- if recover:
- self.verb_file = open(self.filename, 'a+')
- else:
- self.verb_file = open(self.filename, 'w')
- else:
- self.verb_file = self.filename
- def stopTracer(self):
- """
- Stop the tracer
- """
- self.verb_file.flush()
- def trace(self, time, text):
- """
- Actual tracing function
- :param time: time at which this trace happened
- :param text: the text that was traced
- """
- string = ""
- if time > self.prevtime:
- string = "TIME: %10.6f\n" % time[0]
- self.prevtime = time
- string += text + "\n"
- self.verb_file.write(string)
- def _vesselStr(self, vessel):
- # VESSEL: mmsi, name, source, target, task, total distance, distance left, velocity
- return "%s, %s, %s, %s, %s, %.6f, %.6f, %.6f" % vessel.tuple()
- def traceInternal(self, aDEVS):
- """
- Tracing done for the internal transition function
- :param aDEVS: the model that transitioned
- """
- if isinstance(aDEVS, Clock):
- par = aDEVS.parent
- for model in par.component_set:
- if isinstance(model, Pool):
- for vessel in model.state["waiting"].values():
- runTraceAtController(self.server, self.uid, aDEVS,
- [aDEVS.time_last, '"' + self._vesselStr(vessel) + '"'])
- elif isinstance(model, Sailer):
- for vessel in model.state["vessels"]:
- runTraceAtController(self.server, self.uid, aDEVS,
- [aDEVS.time_last, '"' + self._vesselStr(vessel) + '"'])
- def traceInit(self, aDEVS, t):
- if isinstance(aDEVS, Scheduler):
- self.starting_time = aDEVS.starting_time / 1000
- runTraceAtController(self.server, self.uid, aDEVS,
- [aDEVS.time_last, '"START: %10.6f"' % self.starting_time])
- @dataclass
- class StreamedVessel:
- mmsi: str
- name: str
- source: tuple
- target: tuple
- task: str
- total_distance: float
- distance_left: float
- velocity: float
- @staticmethod
- def from_str(args):
- vec = args.split(", ")
- if vec[2] == "None":
- st = None
- if vec[3] == "None":
- tt = None
- else:
- tt = float(vec[3][1:]), float(vec[4][:-1])
- else:
- st = float(vec[2][1:]), float(vec[3][:-1])
- if vec[4] == "None":
- tt = None
- else:
- tt = float(vec[4][1:]), float(vec[5][:-1])
- alist = vec[:2] + [st, tt, vec[-4]] + [float(x) for x in vec[-3:]]
- return StreamedVessel(*alist)
- class Streamer:
- def __init__(self):
- self.time = 0
- self.starting_time = 0
- self.vessels = []
- def write(self, text: str):
- # print(text, end='')
- texts = text.split("\n")
- for t in texts:
- if t.startswith("TIME: "):
- self.time = float(t[6:])
- self.vessels.clear()
- elif t.startswith("START: "):
- self.starting_time = float(t[7:])
- elif len(t) > 0:
- sv = StreamedVessel.from_str(t)
- if not any([v.mmsi == sv.mmsi for v in self.vessels]):
- self.vessels.append(sv)
- def flush(self): pass
|