TwinObject.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import pandas as pd
  2. from networking.mqtt import MQTTClient
  3. import networking as net
  4. from twin.elements import *
  5. from pypdevs.DEVS import CoupledDEVS
  6. class Port(CoupledDEVS):
  7. def __init__(self, name, client):
  8. super(Port, self).__init__(name)
  9. self.scheduler = self.addSubModel(net.Subscriber("tasks"))
  10. self.scheduler.transform = lambda m: pd.Series(data=m, index=["mmsi", "start", "ETA", "source", "target", "task"])
  11. self.planner = self.addSubModel(RoutePlanner("planner"))
  12. self.pool = self.addSubModel(Pool("pool"))
  13. self.sailer = self.addSubModel(Sailer("sailer"))
  14. self.pub = self.addSubModel(net.Publisher("pub", "vessels", client, self.transform_vessel_to_pub))
  15. self.error = self.addSubModel(net.Publisher("error", "error", client))
  16. # self.connectPorts(self.tracer.reqs, self.pool.req_in)
  17. self.connectPorts(self.scheduler.out, self.planner.req_in)
  18. self.connectPorts(self.planner.req_out, self.pool.req_in)
  19. self.connectPorts(self.pool.vessel_out, self.sailer.vessel_in)
  20. self.connectPorts(self.sailer.vessel_out, self.pool.vessel_in)
  21. # TODO: add a Sailer connection that outputs the full vessel bag upon input
  22. self.connectPorts(self.pool.vessel_out, self.pub.inp)
  23. self.connectPorts(self.pool.error, self.error.inp)
  24. # self.clock = self.addSubModel(Clock("clock"))
  25. # self.connectPorts(self.clock.outp, self.sailer.update)
  26. @staticmethod
  27. def transform_vessel_to_pub(vessel, time):
  28. return [
  29. vessel.start,
  30. vessel.mmsi,
  31. vessel.name,
  32. vessel.task,
  33. vessel.velocity,
  34. vessel.source,
  35. vessel.target,
  36. vessel.start,
  37. vessel.ETA
  38. ]
  39. def select(self, imm_children):
  40. for child in imm_children:
  41. if isinstance(child, Sailer):
  42. return child
  43. for child in imm_children:
  44. if isinstance(child, Pool):
  45. return child
  46. return imm_children[0]
  47. if __name__ == '__main__':
  48. from pypdevs.simulator import Simulator
  49. client = MQTTClient("TwinObject")
  50. model = Port("PoA", client)
  51. sim = Simulator(model)
  52. sim.setRealTime(True)
  53. sim.setRealTimePorts({"tasks": model.scheduler.inp})
  54. proc = net.SubscriptionProcess(["tasks"], sim, client)
  55. print("Starting Simulation...")
  56. sim.simulate()
  57. while True:
  58. try:
  59. client.spin()
  60. proc.step()
  61. proc.interrupt()
  62. except KeyboardInterrupt:
  63. print("Terminating...")
  64. break
  65. model.pub.cleanup()
  66. proc.terminate()