Browse Source

Add second broker listening to realtime simulation + basis for external message injection

anfeny 2 months ago
parent
commit
d8c2b640e5

+ 3 - 2
dashboard/src/static/js/dashboard.js

@@ -268,7 +268,8 @@ function getTimestamp() {
 }
 
 /**
- * // Place an order for a piece
+ * Place an order for a piece
+ * ! This order will be placed in the REAL mqtt broker
  * @param color 'RED' or 'BLUE' or 'WHITE'
  */
 function placeOrder(color, topic = 'f/o/order') {
@@ -277,7 +278,7 @@ function placeOrder(color, topic = 'f/o/order') {
         ts: getTimestamp()
     };
     message = JSON.stringify(message);
-    publishMqtt(topic, message);
+    publishMqtt(topic, message, target = 'real');
 }
 
 /**

+ 2 - 0
simulator/data_models/mqtt_message.py

@@ -24,6 +24,8 @@ class MqttMessage:
             message = json.loads(message)
             self.topic = message['topic']
             self.payload = json.loads(message['payload'])
+            if 'origin' in message:
+                self.origin = message['origin']
 
     def to_json(self) -> str:
         total_msg = dict()

+ 38 - 38
simulator/flowchart.md

@@ -4,17 +4,17 @@ title: FischertechnikFactory
 ---
 flowchart LR
 	subgraph Generator
-		Generator_mqtt_in("mqtt_in")
 		Generator_out("out")
+		Generator_mqtt_in("mqtt_in")
 		Generator_mqtt_in ~~~ Generator_out
 	end
 	Generator_out --> DSI_inp
 	subgraph DSI
-		DSI_inp("inp")
-		DSI_vgr_in("vgr_in")
 		DSI_mqtt_out("mqtt_out")
-		DSI_mqtt_in("mqtt_in")
 		DSI_out("out")
+		DSI_vgr_in("vgr_in")
+		DSI_mqtt_in("mqtt_in")
+		DSI_inp("inp")
 		DSI_inp ~~~ DSI_out
 		DSI_mqtt_in ~~~ DSI_mqtt_out
 	end
@@ -22,10 +22,10 @@ flowchart LR
 	DSI_mqtt_out --> MQTTControlUnit_mqtt_in
 	subgraph ReaderStation
 		ReaderStation_color_in("color_in")
+		ReaderStation_nfc_out("nfc_out")
+		ReaderStation_nfc_in("nfc_in")
 		ReaderStation_mqtt_out("mqtt_out")
 		ReaderStation_color_out("color_out")
-		ReaderStation_nfc_in("nfc_in")
-		ReaderStation_nfc_out("nfc_out")
 		ReaderStation_nfc_in ~~~ ReaderStation_nfc_out
 		ReaderStation_color_in ~~~ ReaderStation_color_out
 	end
@@ -33,24 +33,24 @@ flowchart LR
 	ReaderStation_color_out --> VacuumGripper_color_sensor_in
 	ReaderStation_mqtt_out --> MQTTControlUnit_mqtt_in
 	subgraph VacuumGripper
-		VacuumGripper_color_sensor_in("color_sensor_in")
-		VacuumGripper_sld_red_out("sld_red_out")
-		VacuumGripper_nfc_in("nfc_in")
-		VacuumGripper_ct_out("ct_out")
-		VacuumGripper_dsi_out("dsi_out")
-		VacuumGripper_dso_out("dso_out")
-		VacuumGripper_mpo_out("mpo_out")
-		VacuumGripper_sld_blue_out("sld_blue_out")
+		VacuumGripper_color_sensor_out("color_sensor_out")
 		VacuumGripper_ct_in("ct_in")
+		VacuumGripper_mpo_out("mpo_out")
+		VacuumGripper_nfc_in("nfc_in")
+		VacuumGripper_sld_red_out("sld_red_out")
 		VacuumGripper_dsi_in("dsi_in")
-		VacuumGripper_sld_white_in("sld_white_in")
-		VacuumGripper_nfc_out("nfc_out")
+		VacuumGripper_color_sensor_in("color_sensor_in")
+		VacuumGripper_sld_blue_out("sld_blue_out")
+		VacuumGripper_sld_red_in("sld_red_in")
 		VacuumGripper_sld_white_out("sld_white_out")
 		VacuumGripper_sld_blue_in("sld_blue_in")
-		VacuumGripper_color_sensor_out("color_sensor_out")
-		VacuumGripper_mqtt_in("mqtt_in")
+		VacuumGripper_dso_out("dso_out")
+		VacuumGripper_sld_white_in("sld_white_in")
 		VacuumGripper_mqtt_out("mqtt_out")
-		VacuumGripper_sld_red_in("sld_red_in")
+		VacuumGripper_mqtt_in("mqtt_in")
+		VacuumGripper_dsi_out("dsi_out")
+		VacuumGripper_ct_out("ct_out")
+		VacuumGripper_nfc_out("nfc_out")
 		VacuumGripper_dsi_in ~~~ VacuumGripper_dsi_out
 		VacuumGripper_ct_in ~~~ VacuumGripper_ct_out
 		VacuumGripper_nfc_in ~~~ VacuumGripper_nfc_out
@@ -71,11 +71,11 @@ flowchart LR
 	VacuumGripper_dso_out --> DSO_inp
 	VacuumGripper_mqtt_out --> MQTTControlUnit_mqtt_in
 	subgraph Transporter
+		Transporter_left_in("left_in")
+		Transporter_right_in("right_in")
 		Transporter_right_out("right_out")
 		Transporter_left_out("left_out")
-		Transporter_left_in("left_in")
 		Transporter_mqtt_out("mqtt_out")
-		Transporter_right_in("right_in")
 		Transporter_left_in ~~~ Transporter_right_out
 		Transporter_right_in ~~~ Transporter_left_out
 	end
@@ -83,11 +83,11 @@ flowchart LR
 	Transporter_left_out --> HighBayWarehouse_inp
 	Transporter_mqtt_out --> MQTTControlUnit_mqtt_in
 	subgraph HighBayWarehouse
+		HighBayWarehouse_mqtt_in("mqtt_in")
 		HighBayWarehouse_mqtt_out("mqtt_out")
 		HighBayWarehouse_inventory_out("inventory_out")
 		HighBayWarehouse_inp("inp")
 		HighBayWarehouse_out("out")
-		HighBayWarehouse_mqtt_in("mqtt_in")
 		HighBayWarehouse_inp ~~~ HighBayWarehouse_out
 		HighBayWarehouse_mqtt_in ~~~ HighBayWarehouse_mqtt_out
 	end
@@ -101,26 +101,26 @@ flowchart LR
 	end
 	InventoryPublisher_mqtt_out --> MQTTControlUnit_mqtt_in
 	subgraph MPO
+		MPO_vgr_in("vgr_in")
 		MPO_mqtt_out("mqtt_out")
 		MPO_conveyor_out("conveyor_out")
-		MPO_vgr_in("vgr_in")
 		MPO_vgr_in ~~~ MPO_mqtt_out
 		subgraph MPO_oven
-			MPO_oven_gripper_in("gripper_in")
 			MPO_oven_gripper_out("gripper_out")
 			MPO_oven_vgr_in("vgr_in")
 			MPO_oven_mqtt_out("mqtt_out")
+			MPO_oven_gripper_in("gripper_in")
 			MPO_oven_vgr_in ~~~ MPO_oven_gripper_out
 			MPO_oven_gripper_in ~~~ MPO_oven_mqtt_out
 		end
 		MPO_oven_gripper_out --> MPO_gripper_oven_in
 		MPO_oven_mqtt_out --> MPO_mqtt_out
 		subgraph MPO_gripper
+			MPO_gripper_table_out("table_out")
+			MPO_gripper_mqtt_out("mqtt_out")
 			MPO_gripper_oven_in("oven_in")
 			MPO_gripper_oven_out("oven_out")
 			MPO_gripper_table_in("table_in")
-			MPO_gripper_table_out("table_out")
-			MPO_gripper_mqtt_out("mqtt_out")
 			MPO_gripper_oven_in ~~~ MPO_gripper_oven_out
 			MPO_gripper_table_in ~~~ MPO_gripper_table_out
 		end
@@ -128,10 +128,10 @@ flowchart LR
 		MPO_gripper_table_out --> MPO_saw_gripper_in
 		MPO_gripper_mqtt_out --> MPO_mqtt_out
 		subgraph MPO_saw
-			MPO_saw_conveyor_out("conveyor_out")
-			MPO_saw_gripper_in("gripper_in")
 			MPO_saw_mqtt_out("mqtt_out")
 			MPO_saw_gripper_out("gripper_out")
+			MPO_saw_conveyor_out("conveyor_out")
+			MPO_saw_gripper_in("gripper_in")
 			MPO_saw_gripper_in ~~~ MPO_saw_gripper_out
 		end
 		MPO_saw_gripper_out --> MPO_gripper_table_in
@@ -142,9 +142,9 @@ flowchart LR
 	MPO_conveyor_out --> Conveyor_inp
 	MPO_vgr_in --> MPO_oven_vgr_in
 	subgraph Conveyor
+		Conveyor_out("out")
 		Conveyor_mqtt_out("mqtt_out")
 		Conveyor_inp("inp")
-		Conveyor_out("out")
 		Conveyor_inp ~~~ Conveyor_out
 	end
 	Conveyor_out --> SLD_inp
@@ -163,11 +163,11 @@ flowchart LR
 		SLD_red_in ~~~ SLD_blue_out
 		SLD_blue_in ~~~ SLD_mqtt_out
 		subgraph SLD_conveyor
+			SLD_conveyor_inp("inp")
+			SLD_conveyor_out_white("out_white")
 			SLD_conveyor_out_red("out_red")
 			SLD_conveyor_out_blue("out_blue")
 			SLD_conveyor_mqtt_out("mqtt_out")
-			SLD_conveyor_inp("inp")
-			SLD_conveyor_out_white("out_white")
 			SLD_conveyor_inp ~~~ SLD_conveyor_out_white
 		end
 		SLD_conveyor_out_white --> SLD_white_bay_sld_in
@@ -185,20 +185,20 @@ flowchart LR
 		SLD_white_bay_vgr_out --> SLD_white_out
 		SLD_white_bay_mqtt_out --> SLD_mqtt_out
 		subgraph SLD_red_bay
-			SLD_red_bay_vgr_in("vgr_in")
-			SLD_red_bay_mqtt_out("mqtt_out")
 			SLD_red_bay_vgr_out("vgr_out")
 			SLD_red_bay_sld_in("sld_in")
+			SLD_red_bay_mqtt_out("mqtt_out")
+			SLD_red_bay_vgr_in("vgr_in")
 			SLD_red_bay_sld_in ~~~ SLD_red_bay_vgr_out
 			SLD_red_bay_vgr_in ~~~ SLD_red_bay_mqtt_out
 		end
 		SLD_red_bay_vgr_out --> SLD_red_out
 		SLD_red_bay_mqtt_out --> SLD_mqtt_out
 		subgraph SLD_blue_bay
+			SLD_blue_bay_sld_in("sld_in")
 			SLD_blue_bay_mqtt_out("mqtt_out")
 			SLD_blue_bay_vgr_in("vgr_in")
 			SLD_blue_bay_vgr_out("vgr_out")
-			SLD_blue_bay_sld_in("sld_in")
 			SLD_blue_bay_sld_in ~~~ SLD_blue_bay_vgr_out
 			SLD_blue_bay_vgr_in ~~~ SLD_blue_bay_mqtt_out
 		end
@@ -214,19 +214,19 @@ flowchart LR
 	SLD_red_in --> SLD_red_bay_vgr_in
 	SLD_blue_in --> SLD_blue_bay_vgr_in
 	subgraph DSO
-		DSO_mqtt_in("mqtt_in")
 		DSO_out("out")
-		DSO_mqtt_out("mqtt_out")
+		DSO_mqtt_in("mqtt_in")
 		DSO_inp("inp")
+		DSO_mqtt_out("mqtt_out")
 		DSO_inp ~~~ DSO_out
 		DSO_mqtt_in ~~~ DSO_mqtt_out
 	end
 	DSO_mqtt_out --> MQTTControlUnit_mqtt_in
 	subgraph MQTTControlUnit
-		MQTTControlUnit_mqtt_out("mqtt_out")
-		MQTTControlUnit_REALTIME_OBSERVED("REALTIME_OBSERVED")
 		MQTTControlUnit_REALTIME_INTERRUPT("REALTIME_INTERRUPT")
 		MQTTControlUnit_mqtt_in("mqtt_in")
+		MQTTControlUnit_REALTIME_OBSERVED("REALTIME_OBSERVED")
+		MQTTControlUnit_mqtt_out("mqtt_out")
 		MQTTControlUnit_mqtt_in ~~~ MQTTControlUnit_mqtt_out
 		MQTTControlUnit_REALTIME_INTERRUPT ~~~ MQTTControlUnit_REALTIME_OBSERVED
 	end

+ 83 - 58
simulator/realtime_simulation.py

@@ -4,67 +4,94 @@ import threading
 import paho.mqtt.client as mqtt
 from sys import stdout
 from loguru import logger
+from dotenv import load_dotenv
 from pypdevs.simulator import Simulator
-from devs_models.fischertechnik_factory import FischertechnikFactory, FischertechnikFactory
+from devs_models.fischertechnik_factory import FischertechnikFactory
 from utils.flowchart_generator import FlowchartGenerator
+from config import load_config  # Local config.py using dotenv
+
+load_dotenv()
+CFG = load_config()
 
 
 class MQTTSimulationBridge:
-    def __init__(self, host: str = '127.0.0.1', port: int = 1883, topic: str = "#", retry_interval: int = 5):
-        self.host = host
-        self.port = port
-        self.topic = topic
-        self.retry_interval = retry_interval
-        self.sim: Simulator | None = None
-        self.mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
-
-        # MQTT callbacks
-        self.mqtt_client.on_message = self.on_message
-        self.mqtt_client.on_connect = self.on_connect
-        self.mqtt_client.on_disconnect = self.on_disconnect
-
-    def on_message(self, client, userdata, message):
-        msg = dict()
-        msg["topic"] = message.topic
-        msg["payload"] = message.payload.decode()
-        json_string = json.dumps(msg) # Interrupts can only send strings
-
-        topic_filter = ['simulation/spawn', 'simulation/get_status', 'simulation/clear_dso', 'f/o/order']
-        if message.topic in topic_filter and self.sim is not None:
-            logger.info(f"MQTT message received: topic={msg['topic']}, message={msg['payload']}")
+    def __init__(self):
+        self.sim = None
+        self.sim_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="sim_bridge")
+        self.real_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="real_bridge")
+
+        self.sim_client.on_message = self._on_sim
+        self.real_client.on_message = self._on_real
+
+    def _on_sim(self, _cli, _u, msg):
+        json_string = json.dumps({
+            "topic": msg.topic,
+            "payload": msg.payload.decode(),
+            "origin": "sim"
+        })
+
+        allowed_topics = ['simulation/spawn', 'simulation/get_status', 'simulation/clear_dso', 'f/o/order']
+        if msg.topic in allowed_topics and self.sim:
+            logger.info(f"SIM topic: {msg.topic}")
+            self.sim.realtime_interrupt(f"REALTIME_INTERRUPT {json_string}") # Interrupts can only send strings
+
+    def _on_real(self, _cli, _u, msg):
+        data = json.loads(msg.payload.decode())
+        
+        # Directly pass through some messages
+        if msg.topic == "f/o/order": # orders placed in the real world should be passed to the simulation
+            json_string = json.dumps({
+                "topic": msg.topic,
+                "payload": msg.payload.decode(),
+                "origin": "real"
+            })
             self.sim.realtime_interrupt(f"REALTIME_INTERRUPT {json_string}")
+            return
+
+        # Inject selected real events as control messages directly into the simulation
+        # -> Topic: simulation/ctrl/<station>
+
+        if msg.topic == "f/i/state/dsi" and data.get("active") == 1:
+            action = "workpiece_arrived"
+            self._inject("simulation/ctrl/dsi", action)
+
+        elif msg.topic == "f/i/state/dso" and data.get("active") == 0: # TODO: check on state change
+            action = "cleared"
+            self._inject("simulation/ctrl/dso", action)
+
+        elif msg.topic == "f/i/nfc/ds": # TODO: check content if color or id update
+            self._inject("simualtion/ctrl/nfc", "color_update")
 
-    def on_connect(self, client, userdata, flags, rc, properties):
-        if rc == 0:
-            logger.info("Connected to MQTT broker")
-            self.mqtt_client.subscribe(self.topic)
-        else:
-            logger.error(f"Failed to connect with result code {rc}")
+        
+    def _inject(self, topic: str, payload, origin: str='real'):
+        """Injects a MQTT message into the simulation."""
+        json_string = json.dumps({
+                "topic": topic,
+                "payload": {},
+                "origin": origin
+            })
+        self.sim.realtime_interrupt(f"REALTIME_INTERRUPT {json_string}")
 
-    def on_disconnect(self, client, userdata, rc):
-        logger.warning("Disconnected from MQTT broker. Retrying connection...")
-        self.start_mqtt_connection()
+
+    def start_clients(self):
+        self.sim_client.connect(CFG.MQTT_SIM.HOST, CFG.MQTT_SIM.PORT)
+        self.real_client.connect(CFG.MQTT_REAL.HOST, CFG.MQTT_REAL.PORT)
+
+        self.sim_client.subscribe("simulation/#")
+        self.real_client.subscribe("f/i/#") # TODO: check if I actually may narrow this down
+        self.real_client.subscribe("f/o/#") # orders
+        self.sim_client.loop_start()
+        self.real_client.loop_start()
 
     def publish_mqtt(self, data):
-        #logger.info(f"Publishing message to MQTT: {data}")
-        data = json.loads(data[0])
-        topic = data["topic"]
-        payload = data["payload"]
-        self.mqtt_client.publish(topic, json.dumps(payload))
-
-    def start_mqtt_connection(self):
-        while True:
-            try:
-                logger.info(f"Attempting to connect to MQTT broker at {self.host}:{self.port} ...")
-                self.mqtt_client.connect(self.host, self.port, 60)
-                self.mqtt_client.subscribe(self.topic)
-                self.mqtt_client.loop_start()
-                break
-            except Exception as e:
-                logger.warning(f"MQTT connection failed: {e}")
-                time.sleep(self.retry_interval)
+        try:
+            data = json.loads(data[0])
+            self.sim_client.publish(data["topic"], json.dumps(data["payload"]))
+        except Exception as e:
+            logger.warning(f"Publish error: {e}")
 
     def start_simulation(self):
+        logger.info("Starting simulation...")
         model = FischertechnikFactory("FischertechnikFactory", automatic=False)
         FlowchartGenerator(model).generate_file()
 
@@ -75,23 +102,21 @@ class MQTTSimulationBridge:
         self.sim.setRealTimePlatformThreads()
         #self.sim.setVerbose(filename=None)
         self.sim.simulate()
+        logger.info("Simulation started")
 
     def run(self):
-        # Enable logging traces
+        # Enable logging traces, and change the style
         logger.remove()
-        # Add a new logger without file/line info but with colorization enabled
         logger.add(
             stdout,
             format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level:<8}</level> | <level>{message}</level>",
             colorize=True,
             level="TRACE"
         )
+        #level="DEBUG" if CFG.LOGGING else "INFO"
 
-        # Start MQTT connection in a separate thread
-        mqtt_thread = threading.Thread(target=self.start_mqtt_connection, daemon=True)
-        mqtt_thread.start()
-
-        # Start the simulation (which runs in its own daemon thread)
+        # Start MQTT clients in a separate thread
+        threading.Thread(target=self.start_clients, daemon=True).start()
         self.start_simulation()
 
         # Keep the main thread alive until a KeyboardInterrupt is received
@@ -99,8 +124,8 @@ class MQTTSimulationBridge:
             while True:
                 time.sleep(1)
         except KeyboardInterrupt:
-            logger.info("Simulation finished")
-            logger.info("Warehouse collected:")
+            logger.info("Simulation stopped")
+            logger.info("Collected inventory:")
             logger.info(self.sim.model.get_inventory())