123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- import json
- import time
- import threading
- import paho.mqtt.client as mqtt
- from sys import stdout
- from loguru import logger
- from dotenv import load_dotenv
- from pypdevs.simulator import Simulator
- from devs_models.fischertechnik_factory import FischertechnikFactory
- from utils.flowchart_generator import FlowchartGenerator
- from config import load_config # Local config.py using dotenv
- load_dotenv()
- CFG = load_config()
- class MQTTSimulationBridge:
- def __init__(self):
- self.sim = None
- self.sim_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="sim_bridge")
- self.real_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="real_bridge")
- self.sim_client.on_message = self._on_sim
- self.real_client.on_message = self._on_real
- # vars to keep track of real factory state, and notifications to the simulation
- self.is_new_wp_notified: bool = False # if there is new input, will be set to true to prevent double notification
- self.is_dso_busy: bool = False # whether there is a workpiece in the DSO (output tray)
- def _on_sim(self, _cli, _u, msg):
- json_string = json.dumps({
- "topic": msg.topic,
- "payload": msg.payload.decode(),
- "origin": "sim"
- })
- if self.sim:
- logger.info(f"Received SIM MQTT message, topic: {msg.topic}")
- self.sim.realtime_interrupt(f"REALTIME_INTERRUPT {json_string}") # Interrupts can only send strings
- def _on_real(self, _cli, _u, msg):
- data = json.loads(msg.payload.decode())
-
- # --- Messages directly passed through to the simulation ---
- if msg.topic == "f/o/order": # orders placed in the real world should be passed to the simulation
- self._inject("f/o/order", data)
- return
- # --- Messages that are used to control the simulation ---
- # [Topic]: simulation/ctrl/<station>
- command: dict = {"action": ""}
- # Input: new workpiece
- if msg.topic == "f/i/state/dsi":
- if data.get("active") == 1 and data.get("code") == 0: # new workpiece
- if not self.is_new_wp_notified:
- command["action"] = "workpiece_arrived"
- self._inject("simulation/ctrl/dsi", command)
- self.is_new_wp_notified = True
- elif data.get("active") == 1 and data.get("code") == 1: # workpiece is being picked up -> unset flag
- self.is_new_wp_notified = False
- # Output Clear
- elif msg.topic == "f/i/state/dso": # TODO: check on state change
- if data.get("active") == 0 and data.get("code") == 1:
- if self.is_dso_busy:
- command["action"] = "clear"
- self._inject("simulation/ctrl/dso", command) # simulation dso should be cleared (sync with reality)
- self.is_dso_busy = False
- elif data.get("active") == 0 and data.get("code") == 0:
- self.is_dso_busy = True
- elif msg.topic == "f/i/nfc/ds": # TODO: check content if color or id update
- print(data)
- if data.get("workpiece").get("type") == "NONE":
- command["action"] = "id_update"
- command["workpiece"] = data.get("workpiece")
- elif data.get("workpiece"):
- command["action"] = "color_update"
- command["workpiece"] = data.get("workpiece")
-
- self._inject("simulation/ctrl/nfc", command)
-
- def _inject(self, topic: str, payload, origin: str='real'):
- """Injects a MQTT message into the simulation."""
- json_string = json.dumps({
- "topic": topic,
- "payload": json.dumps(payload),
- "origin": origin
- })
- self.sim.realtime_interrupt(f"REALTIME_INTERRUPT {json_string}")
- def start_clients(self):
- self.sim_client.connect(CFG.MQTT_SIM.HOST, CFG.MQTT_SIM.PORT)
- self.real_client.connect(CFG.MQTT_REAL.HOST, CFG.MQTT_REAL.PORT)
- self.sim_client.subscribe("simulation/#")
- self.real_client.subscribe("f/i/#") # TODO: check if I actually may narrow this down
- self.real_client.subscribe("f/o/#") # orders
- self.sim_client.loop_start()
- self.real_client.loop_start()
- def publish_mqtt(self, data):
- try:
- data = json.loads(data[0])
- self.sim_client.publish(data["topic"], json.dumps(data["payload"]))
- except Exception as e:
- logger.warning(f"Publish error: {e}")
- def start_simulation(self):
- logger.info("Starting simulation...")
- model = FischertechnikFactory("FischertechnikFactory", automatic=False)
- FlowchartGenerator(model).generate_file()
- self.sim = Simulator(model)
- self.sim.setRealTime(True)
- self.sim.setRealTimePorts({"REALTIME_INTERRUPT": model.mqtt_control.REALTIME_INTERRUPT})
- self.sim.setListenPorts(model.REALTIME_OBSERVED, self.publish_mqtt)
- self.sim.setRealTimePlatformThreads()
- #self.sim.setVerbose(filename=None)
- self.sim.simulate()
- logger.info("Simulation started")
- def run(self):
- # Enable logging traces, and change the style
- logger.remove()
- logger.add(
- stdout,
- format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level:<8}</level> | <level>{message}</level>",
- colorize=True,
- level="TRACE"
- )
- #level="DEBUG" if CFG.LOGGING else "INFO"
- # Start MQTT clients in a separate thread
- threading.Thread(target=self.start_clients, daemon=True).start()
- self.start_simulation()
- # Keep the main thread alive until a KeyboardInterrupt is received
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- logger.info("Simulation stopped")
- logger.info("HBW inventory:")
- logger.info(self.sim.model.get_inventory())
- logger.info("Collector contents:")
- logger.info(self.sim.model.collector.get_contents())
- if __name__ == "__main__":
- bridge = MQTTSimulationBridge()
- bridge.run()
|