mqtt_control_unit.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import json
  2. from collections import deque
  3. from dataclasses import dataclass, field
  4. from loguru import logger
  5. from pypdevs.DEVS import AtomicDEVS
  6. from pypdevs.infinity import INFINITY
  7. from data_models.mqtt_message import MqttMessage
  8. @dataclass
  9. class MQTTControlState:
  10. message_queue_internal: deque[MqttMessage] = deque() # messages from inside the simulation
  11. message_queue_external: deque[MqttMessage] = deque() # messages received via interrupts
  12. message_log: list[str] = field(default_factory=list)
  13. class MQTTControlUnit(AtomicDEVS):
  14. """ Mqtt Control Unit which receives messages which it publishes to its subscribers """
  15. def __init__(self, name: str):
  16. # name needs to be unique to refer to it
  17. super(MQTTControlUnit, self).__init__(name)
  18. self.mqtt_in = self.addInPort("mqtt_in") # incoming messages from individual models
  19. self.mqtt_out = self.addOutPort("mqtt_out") # outgoing messages, relayed to all models connected
  20. # Ports for realtime simulation
  21. self.REALTIME_INTERRUPT = self.addInPort(name="REALTIME_INTERRUPT")
  22. self.REALTIME_OBSERVED = self.addOutPort(name="REALTIME_OBSERVED")
  23. self.state = MQTTControlState()
  24. def extTransition(self, inputs):
  25. if self.mqtt_in in inputs:
  26. for msg in inputs[self.mqtt_in]:
  27. self.state.message_queue_internal.append(msg)
  28. if self.REALTIME_INTERRUPT in inputs:
  29. json_msg = json.dumps(inputs[self.REALTIME_INTERRUPT][0])
  30. msg = MqttMessage(json_msg)
  31. self.state.message_queue_external.append(msg)
  32. return self.state # important, return state
  33. def timeAdvance(self):
  34. # Next workpiece generation time
  35. if self.state.message_queue_internal or self.state.message_queue_external: # item in queue
  36. return 0.0 # immediate
  37. else:
  38. return INFINITY # idle
  39. def outputFnc(self):
  40. # Push external message to internal simulation
  41. if self.state.message_queue_external:
  42. msg = self.state.message_queue_external[0]
  43. logger.trace(f"{type(self).__name__} '{self.name}' outputs: {msg} TO SIMULATION")
  44. return {self.mqtt_out: [msg]}
  45. # Push internal message to outside the simulation
  46. elif self.state.message_queue_internal:
  47. msg: MqttMessage = self.state.message_queue_internal[0]
  48. #logger.trace(f"{type(self).__name__} '{self.name}' outputs: {msg} TO OUTSIDE")
  49. return {self.mqtt_out: [msg], self.REALTIME_OBSERVED: [msg.to_json()]}
  50. logger.error(f"No message in {type(self).__name__} '{self.name}'")
  51. def intTransition(self):
  52. # We have just output the message, so move it to the history log
  53. output_msg = ""
  54. if self.state.message_queue_external:
  55. output_msg = self.state.message_queue_external.popleft().to_json()
  56. elif self.state.message_queue_internal:
  57. output_msg = self.state.message_queue_internal.popleft().to_json()
  58. # TODO: enable if required, spams traces atm
  59. # self.state.message_log.append(output_msg)
  60. return self.state