1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import json
- from collections import deque
- from dataclasses import dataclass, field
- from loguru import logger
- from pypdevs.DEVS import AtomicDEVS
- from pypdevs.infinity import INFINITY
- from data_models.mqtt_message import MqttMessage
- @dataclass
- class MQTTControlState:
- message_queue_internal: deque[MqttMessage] = deque() # messages from inside the simulation
- message_queue_external: deque[MqttMessage] = deque() # messages received via interrupts
- message_log: list[str] = field(default_factory=list)
- class MQTTControlUnit(AtomicDEVS):
- """ Mqtt Control Unit which receives messages which it publishes to its subscribers """
- def __init__(self, name: str):
- # name needs to be unique to refer to it
- super(MQTTControlUnit, self).__init__(name)
- self.mqtt_in = self.addInPort("mqtt_in") # incoming messages from individual models
- self.mqtt_out = self.addOutPort("mqtt_out") # outgoing messages, relayed to all models connected
- # Ports for realtime simulation
- self.REALTIME_INTERRUPT = self.addInPort(name="REALTIME_INTERRUPT")
- self.REALTIME_OBSERVED = self.addOutPort(name="REALTIME_OBSERVED")
- self.state = MQTTControlState()
- def extTransition(self, inputs):
- if self.mqtt_in in inputs:
- for msg in inputs[self.mqtt_in]:
- self.state.message_queue_internal.append(msg)
- if self.REALTIME_INTERRUPT in inputs:
- json_msg = json.dumps(inputs[self.REALTIME_INTERRUPT][0])
- msg = MqttMessage(json_msg)
- self.state.message_queue_external.append(msg)
- return self.state # important, return state
- def timeAdvance(self):
- # Next workpiece generation time
- if self.state.message_queue_internal or self.state.message_queue_external: # item in queue
- return 0.0 # immediate
- else:
- return INFINITY # idle
- def outputFnc(self):
- # Push external message to internal simulation
- if self.state.message_queue_external:
- msg = self.state.message_queue_external[0]
- logger.trace(f"{type(self).__name__} '{self.name}' outputs: {msg} TO SIMULATION")
- return {self.mqtt_out: [msg]}
- # Push internal message to outside the simulation
- elif self.state.message_queue_internal:
- msg: MqttMessage = self.state.message_queue_internal[0]
- #logger.trace(f"{type(self).__name__} '{self.name}' outputs: {msg} TO OUTSIDE")
- return {self.mqtt_out: [msg], self.REALTIME_OBSERVED: [msg.to_json()]}
- logger.error(f"No message in {type(self).__name__} '{self.name}'")
- def intTransition(self):
- # We have just output the message, so move it to the history log
- output_msg = ""
- if self.state.message_queue_external:
- output_msg = self.state.message_queue_external.popleft().to_json()
- elif self.state.message_queue_internal:
- output_msg = self.state.message_queue_internal.popleft().to_json()
- # TODO: enable if required, spams traces atm
- # self.state.message_log.append(output_msg)
- return self.state
|