test_model.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. from dataclasses import dataclass
  2. from loguru import logger
  3. from data_models.mqtt_message import MqttMessage
  4. from devs_models.mqtt_control_unit import MQTTControlUnit
  5. from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
  6. from pypdevs.infinity import INFINITY
  7. class TestModel(CoupledDEVS):
  8. def __init__(self, name: str, automatic: bool = True):
  9. super(TestModel, self).__init__(name)
  10. # Add output port for MQTT
  11. self.REALTIME_OBSERVED = self.addOutPort('REALTIME_OBSERVED')
  12. # Create models
  13. self.mqtt_control: MQTTControlUnit = self.addSubModel(MQTTControlUnit("MQTTControlUnit"))
  14. self.test_model: TestModelAtomic = self.addSubModel(TestModelAtomic("TestModel"))
  15. # Connect MQTT ports
  16. self.connectPorts(self.mqtt_control.mqtt_out, self.test_model.mqtt_in)
  17. self.connectPorts(self.mqtt_control.REALTIME_OBSERVED, self.REALTIME_OBSERVED)
  18. @dataclass
  19. class TestModelState:
  20. repetition_delay: float
  21. delta_t: float = INFINITY
  22. class TestModelAtomic(AtomicDEVS):
  23. """ A test model which prints something on loop"""
  24. def __init__(self, name: str, repetition_delay: float = 5.0):
  25. # name needs to be unique to refer to it
  26. super(TestModelAtomic, self).__init__(name)
  27. self.mqtt_in = self.addInPort("mqtt_in")
  28. self.state = TestModelState(repetition_delay)
  29. self.state.delta_t = repetition_delay
  30. def extTransition(self, inputs):
  31. self.state.delta_t -= self.elapsed
  32. if self.mqtt_in in inputs:
  33. msg: MqttMessage = inputs[self.mqtt_in][0]
  34. if 'simulation' in msg.topic:
  35. logger.trace(f"TestModel '{self.name}' received: {msg}")
  36. return self.state # important, return state
  37. def timeAdvance(self):
  38. # Next internal transition time
  39. return self.state.delta_t
  40. def outputFnc(self):
  41. logger.critical("TestModel OUTPUT_FNC")
  42. return {}
  43. def intTransition(self):
  44. # We have just output, reset the timer
  45. self.state.delta_t = self.state.repetition_delay
  46. return self.state