from data_models.mqtt_message import MqttMessage from pypdevs.DEVS import AtomicDEVS from pypdevs.infinity import INFINITY from dataclasses import dataclass from loguru import logger from data_models.workpiece import Workpiece from utils.get_timestamp import get_timestamp from utils.timed_phase_enum import TimedPhaseEnum class MPOTableSawPhase(TimedPhaseEnum): """ Steps in order, along with their timings """ IDLE = ('IDLE', INFINITY) ROTATE_TO_FRONT = ('ROTATE_TO_FRONT', 2.697) # rotate table to face the gripper AWAIT_WP = ('AWAIT_WP', INFINITY) # await workpiece delivery by gripper ROTATE_TO_BACK = ('ROTATE_TO_BACK', 1.791) # rotate table to face saw (at back) SAW_RIGHT = ('SAW_RIGHT', 3.5) SAW_LEFT = ('SAW_LEFT', 3.5) ROTATE_TO_SIDE = ('ROTATE_TO_SIDE', 0.904) # turn table towards conveyor (to the right side) EJECT_WP = ('EJECT_WP', 0.630) # eject/push workpiece on conveyor @dataclass class MPOTableSawState: delta_t: float = INFINITY phase: MPOTableSawPhase = MPOTableSawPhase.IDLE workpiece: Workpiece | None = None status_requested: bool = True # if true, publish status visual_update_pending: bool = False def get_mpo_produced_msg() -> MqttMessage: """ Get the 'MPO_PRODUCED' mqtt message """ message = MqttMessage() message.topic = "fl/mpo/ack" message.payload = { "code": 2, "ts": get_timestamp(), } return message class MPOTableSaw(AtomicDEVS): """ MPO rotating table and table saw """ def __init__(self, name: str): super(MPOTableSaw, self).__init__(name) self.gripper_in = self.addInPort("gripper_in") # vacuum gripper which brings us workpiece self.gripper_out = self.addOutPort("gripper_out") # to acknowledge table turning to the gripper self.conveyor_out = self.addOutPort("conveyor_out") # conveyor belt output after we're done self.mqtt_in = self.addInPort("mqtt_in") self.mqtt_out = self.addOutPort("mqtt_out") self.state = MPOTableSawState() def change_phase(self, new_phase: MPOTableSawPhase): """ Wrapper for changing the phase and time associated with it, helps with logging """ self.state.phase = new_phase self.state.delta_t = new_phase.timing logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}") self.state.visual_update_pending = True def get_visual_update_data(self) -> MqttMessage: """ Get visual update data for the animation, contains the action taken and the duration of that action left """ message = MqttMessage() message.topic = "visualization/mpo/table_saw" duration = self.state.delta_t if duration == INFINITY: duration = None message.payload = { "action": self.state.phase.value, "duration": duration, # should be equal to the timing of the phase "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None, } return message def get_status_message(self, use_next_phase=True) -> MqttMessage: phase = self.state.phase.next() if use_next_phase else self.state.phase active = 0 if (phase == MPOTableSawPhase.IDLE) else 1 code = 2 if active == 1 else 1 message = MqttMessage() message.topic = "f/i/state/mpo" message.payload = { "active": active, "code": code, "description": '', "station": 'mpo', "ts": get_timestamp(), } return message def extTransition(self, inputs): self.state.delta_t -= self.elapsed if self.mqtt_in in inputs: msg = inputs[self.mqtt_in][0] if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh": self.state.status_requested = True self.state.visual_update_pending = True return self.state elif not (self.state.phase == MPOTableSawPhase.IDLE or self.state.phase == MPOTableSawPhase.AWAIT_WP): logger.error(f"{type(self).__name__} '{self.name}' received input while not expecting: {inputs}") return self.state if self.gripper_in in inputs: # 2 cases: instruction to turn, or workpiece if self.state.phase == MPOTableSawPhase.IDLE: self.change_phase(MPOTableSawPhase.ROTATE_TO_FRONT) elif self.state.phase == MPOTableSawPhase.AWAIT_WP: wp = inputs[self.gripper_in][0] self.state.workpiece = wp self.change_phase(MPOTableSawPhase.ROTATE_TO_BACK) else: logger.error(f"{type(self).__name__} '{self.name}' received gripper input while not expecting: {inputs}") return self.state # important, return state def timeAdvance(self): if self.state.visual_update_pending or self.state.status_requested: return 0.0 return self.state.delta_t def outputFnc(self): if self.state.visual_update_pending: return {self.mqtt_out: [self.get_visual_update_data()]} if self.state.status_requested: return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]} if self.state.phase == MPOTableSawPhase.ROTATE_TO_FRONT: return {self.gripper_out: {}, self.mqtt_out: [self.get_status_message()]} # acknowledge rotation elif self.state.phase == MPOTableSawPhase.EJECT_WP: # output wp, and publish 'mpo produced' message to mqtt return {self.conveyor_out: [self.state.workpiece], self.mqtt_out: [get_mpo_produced_msg(), self.get_status_message()]} return {self.mqtt_out: [self.get_status_message()]} def intTransition(self): if self.state.visual_update_pending: self.state.visual_update_pending = False return self.state if self.state.status_requested: self.state.status_requested = False return self.state if self.state.phase == MPOTableSawPhase.EJECT_WP: self.state.workpiece = None # remove the workpiece we just output to the conveyor # Transition to next state self.change_phase(self.state.phase.next()) return self.state