123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- from dataclasses import dataclass
- from enum import Enum
- from loguru import logger
- from data_models.mqtt_message import MqttMessage
- from pypdevs.DEVS import AtomicDEVS
- from pypdevs.infinity import INFINITY
- from data_models.crate import Crate
- from utils.timed_phase_enum import TimedPhaseEnum
- class CrateTransporterPhase(TimedPhaseEnum):
- IDLE = ('IDLE', INFINITY)
- MOVE_L_TO_R = ('MOVE_L_TO_R', 1.150)
- MOVE_R_TO_L = ('MOVE_R_TO_L', 0.965)
- @dataclass
- class CrateTransporterState:
- cargo: Crate | None = None
- phase: CrateTransporterPhase = CrateTransporterPhase.IDLE
- delta_t: float = INFINITY
- visual_update_pending: bool = False
- class CrateTransporter(AtomicDEVS):
- """
- A crate transporter which transports crates from one side to the other (takes exactly 1 second)
- [HBW] left_in -----> right_out [VGR]
- left_out <----- right_in
- """
- def __init__(self, name: str):
- super(CrateTransporter, self).__init__(name)
- # IN PORTS
- self.left_in = self.addInPort("left_in")
- self.right_in = self.addInPort("right_in")
- self.mqtt_in = self.addInPort("mqtt_in")
- # OUT PORTS
- self.right_out = self.addOutPort("right_out")
- self.left_out = self.addOutPort("left_out")
- self.mqtt_out = self.addOutPort("mqtt_out")
- self.state = CrateTransporterState()
- def change_phase(self, new_phase: CrateTransporterPhase):
- """ Wrapper for changing the phase and time associated with it, helps with logging """
- self.state.phase = new_phase
- self.state.delta_t = new_phase.timing
- logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
- self.state.visual_update_pending = True
- def get_visual_update_data(self) -> MqttMessage:
- """ Get visual update data for the animation, contains the action taken and the duration of that action left """
- message = MqttMessage()
- message.topic = "visualization/ct"
- duration = self.state.delta_t
- if duration == INFINITY: duration = None
- message.payload = {
- "action": self.state.phase.value,
- "duration": duration, # should be equal to the timing of the phase
- "crate": self.state.cargo.to_dict() if self.state.cargo else None, # cargo (crate), if present
- }
- return message
- def _is_input_valid(self, input_item) -> bool:
- """ Checks if the received input is valid for the current state"""
- if not isinstance(input_item, Crate):
- logger.error(f"{type(self).__name__} '{self.name}' received input which is not a crate: {input_item}")
- return False
- if self.state.cargo is not None:
- logger.error(f"{type(self).__name__} '{self.name}' received input crate whilst already holding a crate")
- return False
- return True
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.mqtt_in in inputs:
- # Handle MQTT input, which is used for visual update requests
- mqtt_message = inputs[self.mqtt_in][0]
- if mqtt_message.topic == "simulation/ctrl/all" and mqtt_message.payload.get("action") == "refresh":
- self.state.visual_update_pending = True
- return self.state
- # Check if input while moving
- elif not self.state.phase == CrateTransporterPhase.IDLE:
- logger.error(f"{type(self).__name__} '{self.name}' received input while moving {self.state.phase}")
- return self.state
- if self.left_in in inputs or self.right_in in inputs:
- if self.left_in in inputs:
- item = inputs[self.left_in][0]
- new_phase = CrateTransporterPhase.MOVE_L_TO_R
- else: # right_in
- item = inputs[self.right_in][0]
- new_phase = CrateTransporterPhase.MOVE_R_TO_L
- if self._is_input_valid(item):
- # Add the item and proceed to the next state
- logger.trace(f"{type(self).__name__} '{self.name}' received: {item}")
- self.state.cargo = item
- self.change_phase(new_phase)
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.visual_update_pending:
- return 0.0
- return self.state.delta_t
- def outputFnc(self):
- if self.state.visual_update_pending:
- return {self.mqtt_out: [self.get_visual_update_data()]}
- logger.trace(f"{type(self).__name__} '{self.name}' outputs: {self.state.cargo}")
- if self.state.phase == CrateTransporterPhase.MOVE_R_TO_L:
- return {self.left_out: [self.state.cargo]}
- elif self.state.phase == CrateTransporterPhase.MOVE_L_TO_R:
- return {self.right_out: [self.state.cargo]}
- return {}
- def intTransition(self):
- if self.state.visual_update_pending:
- self.state.visual_update_pending = False
- return self.state
- # We have just output a crate, so set idle again and remove the cargo
- self.change_phase(CrateTransporterPhase.IDLE)
- self.state.cargo = None
- return self.state
|