| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- import pandas as pd
- from networking.mqtt import MQTTClient
- import networking as net
- from twin.elements import *
- from pypdevs.DEVS import CoupledDEVS
- class Port(CoupledDEVS):
- def __init__(self, name, client):
- super(Port, self).__init__(name)
- self.scheduler = self.addSubModel(net.Subscriber("tasks"))
- self.scheduler.transform = lambda m: pd.Series(data=m, index=["mmsi", "start", "ETA", "source", "target", "task"])
- self.planner = self.addSubModel(RoutePlanner("planner"))
- self.pool = self.addSubModel(Pool("pool"))
- self.sailer = self.addSubModel(Sailer("sailer"))
- self.pub = self.addSubModel(net.Publisher("pub", "vessels", client, self.transform_vessel_to_pub))
- self.error = self.addSubModel(net.Publisher("error", "error", client))
- # self.connectPorts(self.tracer.reqs, self.pool.req_in)
- self.connectPorts(self.scheduler.out, self.planner.req_in)
- self.connectPorts(self.planner.req_out, self.pool.req_in)
- self.connectPorts(self.pool.vessel_out, self.sailer.vessel_in)
- self.connectPorts(self.sailer.vessel_out, self.pool.vessel_in)
- # TODO: add a Sailer connection that outputs the full vessel bag upon input
- self.connectPorts(self.pool.vessel_out, self.pub.inp)
- self.connectPorts(self.pool.error, self.error.inp)
- # self.clock = self.addSubModel(Clock("clock"))
- # self.connectPorts(self.clock.outp, self.sailer.update)
- @staticmethod
- def transform_vessel_to_pub(vessel, time):
- return [
- vessel.start,
- vessel.mmsi,
- vessel.name,
- vessel.task,
- vessel.velocity,
- vessel.source,
- vessel.target,
- vessel.start,
- vessel.ETA
- ]
- def select(self, imm_children):
- for child in imm_children:
- if isinstance(child, Sailer):
- return child
- for child in imm_children:
- if isinstance(child, Pool):
- return child
- return imm_children[0]
- if __name__ == '__main__':
- from pypdevs.simulator import Simulator
- client = MQTTClient("TwinObject")
- model = Port("PoA", client)
- sim = Simulator(model)
- sim.setRealTime(True)
- sim.setRealTimePorts({"tasks": model.scheduler.inp})
- proc = net.SubscriptionProcess(["tasks"], sim, client)
- print("Starting Simulation...")
- sim.simulate()
- while True:
- try:
- client.spin()
- proc.step()
- proc.interrupt()
- except KeyboardInterrupt:
- print("Terminating...")
- break
- model.pub.cleanup()
- proc.terminate()
|