from dataclasses import dataclass from enum import Enum from loguru import logger from data_models.mqtt_message import MqttMessage from pypdevs.DEVS import AtomicDEVS from pypdevs.infinity import INFINITY from data_models.crate import Crate from utils.timed_phase_enum import TimedPhaseEnum class CrateTransporterPhase(TimedPhaseEnum): IDLE = ('IDLE', INFINITY) MOVE_L_TO_R = ('MOVE_L_TO_R', 1.150) MOVE_R_TO_L = ('MOVE_R_TO_L', 0.965) @dataclass class CrateTransporterState: cargo: Crate | None = None phase: CrateTransporterPhase = CrateTransporterPhase.IDLE delta_t: float = INFINITY visual_update_pending: bool = False class CrateTransporter(AtomicDEVS): """ A crate transporter which transports crates from one side to the other (takes exactly 1 second) [HBW] left_in -----> right_out [VGR] left_out <----- right_in """ def __init__(self, name: str): super(CrateTransporter, self).__init__(name) # IN PORTS self.left_in = self.addInPort("left_in") self.right_in = self.addInPort("right_in") self.mqtt_in = self.addInPort("mqtt_in") # OUT PORTS self.right_out = self.addOutPort("right_out") self.left_out = self.addOutPort("left_out") self.mqtt_out = self.addOutPort("mqtt_out") self.state = CrateTransporterState() def change_phase(self, new_phase: CrateTransporterPhase): """ Wrapper for changing the phase and time associated with it, helps with logging """ self.state.phase = new_phase self.state.delta_t = new_phase.timing logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}") self.state.visual_update_pending = True def get_visual_update_data(self) -> MqttMessage: """ Get visual update data for the animation, contains the action taken and the duration of that action left """ message = MqttMessage() message.topic = "visualization/ct" duration = self.state.delta_t if duration == INFINITY: duration = None message.payload = { "action": self.state.phase.value, "duration": duration, # should be equal to the timing of the phase "crate": self.state.cargo.to_dict() if self.state.cargo else None, # cargo (crate), if present } return message def _is_input_valid(self, input_item) -> bool: """ Checks if the received input is valid for the current state""" if not isinstance(input_item, Crate): logger.error(f"{type(self).__name__} '{self.name}' received input which is not a crate: {input_item}") return False if self.state.cargo is not None: logger.error(f"{type(self).__name__} '{self.name}' received input crate whilst already holding a crate") return False return True def extTransition(self, inputs): self.state.delta_t -= self.elapsed if self.mqtt_in in inputs: # Handle MQTT input, which is used for visual update requests mqtt_message = inputs[self.mqtt_in][0] if mqtt_message.topic == "simulation/ctrl/all" and mqtt_message.payload.get("action") == "refresh": self.state.visual_update_pending = True return self.state # Check if input while moving elif not self.state.phase == CrateTransporterPhase.IDLE: logger.error(f"{type(self).__name__} '{self.name}' received input while moving {self.state.phase}") return self.state if self.left_in in inputs or self.right_in in inputs: if self.left_in in inputs: item = inputs[self.left_in][0] new_phase = CrateTransporterPhase.MOVE_L_TO_R else: # right_in item = inputs[self.right_in][0] new_phase = CrateTransporterPhase.MOVE_R_TO_L if self._is_input_valid(item): # Add the item and proceed to the next state logger.trace(f"{type(self).__name__} '{self.name}' received: {item}") self.state.cargo = item self.change_phase(new_phase) return self.state # important, return state def timeAdvance(self): if self.state.visual_update_pending: return 0.0 return self.state.delta_t def outputFnc(self): if self.state.visual_update_pending: return {self.mqtt_out: [self.get_visual_update_data()]} logger.trace(f"{type(self).__name__} '{self.name}' outputs: {self.state.cargo}") if self.state.phase == CrateTransporterPhase.MOVE_R_TO_L: return {self.left_out: [self.state.cargo]} elif self.state.phase == CrateTransporterPhase.MOVE_L_TO_R: return {self.right_out: [self.state.cargo]} return {} def intTransition(self): if self.state.visual_update_pending: self.state.visual_update_pending = False return self.state # We have just output a crate, so set idle again and remove the cargo self.change_phase(CrateTransporterPhase.IDLE) self.state.cargo = None return self.state