realtime_simulation.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import json
  2. import time
  3. import threading
  4. import paho.mqtt.client as mqtt
  5. from sys import stdout
  6. from loguru import logger
  7. from dotenv import load_dotenv
  8. from pypdevs.simulator import Simulator
  9. from devs_models.fischertechnik_factory import FischertechnikFactory
  10. from utils.flowchart_generator import FlowchartGenerator
  11. from config import load_config # Local config.py using dotenv
  12. load_dotenv()
  13. CFG = load_config()
  14. class MQTTSimulationBridge:
  15. def __init__(self):
  16. self.sim = None
  17. self.sim_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="sim_bridge")
  18. self.real_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="real_bridge")
  19. self.sim_client.on_message = self._on_sim
  20. self.real_client.on_message = self._on_real
  21. # vars to keep track of real factory state, and notifications to the simulation
  22. self.is_new_wp_notified: bool = False # if there is new input, will be set to true to prevent double notification
  23. self.is_dso_busy: bool = False # whether there is a workpiece in the DSO (output tray)
  24. def _on_sim(self, _cli, _u, msg):
  25. json_string = json.dumps({
  26. "topic": msg.topic,
  27. "payload": msg.payload.decode(),
  28. "origin": "sim"
  29. })
  30. if self.sim:
  31. logger.info(f"Received SIM MQTT message, topic: {msg.topic}")
  32. self.sim.realtime_interrupt(f"REALTIME_INTERRUPT {json_string}") # Interrupts can only send strings
  33. def _on_real(self, _cli, _u, msg):
  34. data = json.loads(msg.payload.decode())
  35. # --- Messages directly passed through to the simulation ---
  36. if msg.topic == "f/o/order": # orders placed in the real world should be passed to the simulation
  37. self._inject("f/o/order", data)
  38. return
  39. # --- Messages that are used to control the simulation ---
  40. # [Topic]: simulation/ctrl/<station>
  41. command: dict = {"action": ""}
  42. # Input: new workpiece
  43. if msg.topic == "f/i/state/dsi":
  44. if data.get("active") == 1 and data.get("code") == 0: # new workpiece
  45. if not self.is_new_wp_notified:
  46. command["action"] = "workpiece_arrived"
  47. self._inject("simulation/ctrl/dsi", command)
  48. self.is_new_wp_notified = True
  49. elif data.get("active") == 1 and data.get("code") == 1: # workpiece is being picked up -> unset flag
  50. self.is_new_wp_notified = False
  51. # Output Clear
  52. elif msg.topic == "f/i/state/dso": # TODO: check on state change
  53. if data.get("active") == 0 and data.get("code") == 1:
  54. if self.is_dso_busy:
  55. command["action"] = "clear"
  56. self._inject("simulation/ctrl/dso", command) # simulation dso should be cleared (sync with reality)
  57. self.is_dso_busy = False
  58. elif data.get("active") == 0 and data.get("code") == 0:
  59. self.is_dso_busy = True
  60. elif msg.topic == "f/i/nfc/ds": # TODO: check content if color or id update
  61. print(data)
  62. if data.get("workpiece").get("type") == "NONE":
  63. command["action"] = "id_update"
  64. command["workpiece"] = data.get("workpiece")
  65. elif data.get("workpiece"):
  66. command["action"] = "color_update"
  67. command["workpiece"] = data.get("workpiece")
  68. self._inject("simulation/ctrl/nfc", command)
  69. def _inject(self, topic: str, payload, origin: str='real'):
  70. """Injects a MQTT message into the simulation."""
  71. json_string = json.dumps({
  72. "topic": topic,
  73. "payload": json.dumps(payload),
  74. "origin": origin
  75. })
  76. self.sim.realtime_interrupt(f"REALTIME_INTERRUPT {json_string}")
  77. def start_clients(self):
  78. self.sim_client.connect(CFG.MQTT_SIM.HOST, CFG.MQTT_SIM.PORT)
  79. self.real_client.connect(CFG.MQTT_REAL.HOST, CFG.MQTT_REAL.PORT)
  80. self.sim_client.subscribe("simulation/#")
  81. self.real_client.subscribe("f/i/#") # TODO: check if I actually may narrow this down
  82. self.real_client.subscribe("f/o/#") # orders
  83. self.sim_client.loop_start()
  84. self.real_client.loop_start()
  85. def publish_mqtt(self, data):
  86. try:
  87. data = json.loads(data[0])
  88. self.sim_client.publish(data["topic"], json.dumps(data["payload"]))
  89. except Exception as e:
  90. logger.warning(f"Publish error: {e}")
  91. def start_simulation(self):
  92. logger.info("Starting simulation...")
  93. model = FischertechnikFactory("FischertechnikFactory", automatic=False)
  94. FlowchartGenerator(model).generate_file()
  95. self.sim = Simulator(model)
  96. self.sim.setRealTime(True)
  97. self.sim.setRealTimePorts({"REALTIME_INTERRUPT": model.mqtt_control.REALTIME_INTERRUPT})
  98. self.sim.setListenPorts(model.REALTIME_OBSERVED, self.publish_mqtt)
  99. self.sim.setRealTimePlatformThreads()
  100. #self.sim.setVerbose(filename=None)
  101. self.sim.simulate()
  102. logger.info("Simulation started")
  103. def run(self):
  104. # Enable logging traces, and change the style
  105. logger.remove()
  106. logger.add(
  107. stdout,
  108. format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level:<8}</level> | <level>{message}</level>",
  109. colorize=True,
  110. level="TRACE"
  111. )
  112. #level="DEBUG" if CFG.LOGGING else "INFO"
  113. # Start MQTT clients in a separate thread
  114. threading.Thread(target=self.start_clients, daemon=True).start()
  115. self.start_simulation()
  116. # Keep the main thread alive until a KeyboardInterrupt is received
  117. try:
  118. while True:
  119. time.sleep(1)
  120. except KeyboardInterrupt:
  121. logger.info("Simulation stopped")
  122. logger.info("HBW inventory:")
  123. logger.info(self.sim.model.get_inventory())
  124. logger.info("Collector contents:")
  125. logger.info(self.sim.model.collector.get_contents())
  126. if __name__ == "__main__":
  127. bridge = MQTTSimulationBridge()
  128. bridge.run()