| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- from dataclasses import dataclass
- from loguru import logger
- from data_models.mqtt_message import MqttMessage
- from devs_models.mqtt_control_unit import MQTTControlUnit
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.infinity import INFINITY
- class TestModel(CoupledDEVS):
- def __init__(self, name: str, automatic: bool = True):
- super(TestModel, self).__init__(name)
- # Add output port for MQTT
- self.REALTIME_OBSERVED = self.addOutPort('REALTIME_OBSERVED')
- # Create models
- self.mqtt_control: MQTTControlUnit = self.addSubModel(MQTTControlUnit("MQTTControlUnit"))
- self.test_model: TestModelAtomic = self.addSubModel(TestModelAtomic("TestModel"))
- # Connect MQTT ports
- self.connectPorts(self.mqtt_control.mqtt_out, self.test_model.mqtt_in)
- self.connectPorts(self.mqtt_control.REALTIME_OBSERVED, self.REALTIME_OBSERVED)
- @dataclass
- class TestModelState:
- repetition_delay: float
- delta_t: float = INFINITY
- class TestModelAtomic(AtomicDEVS):
- """ A test model which prints something on loop"""
- def __init__(self, name: str, repetition_delay: float = 5.0):
- # name needs to be unique to refer to it
- super(TestModelAtomic, self).__init__(name)
- self.mqtt_in = self.addInPort("mqtt_in")
- self.state = TestModelState(repetition_delay)
- self.state.delta_t = repetition_delay
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.mqtt_in in inputs:
- msg: MqttMessage = inputs[self.mqtt_in][0]
- if 'simulation' in msg.topic:
- logger.trace(f"TestModel '{self.name}' received: {msg}")
- return self.state # important, return state
- def timeAdvance(self):
- # Next internal transition time
- return self.state.delta_t
- def outputFnc(self):
- logger.critical("TestModel OUTPUT_FNC")
- return {}
- def intTransition(self):
- # We have just output, reset the timer
- self.state.delta_t = self.state.repetition_delay
- return self.state
|