from dataclasses import dataclass from loguru import logger from data_models.mqtt_message import MqttMessage from data_models.workpiece import Workpiece, WorkpieceColor from devs_models.sld_holding_bay import HoldingBay from pypdevs.DEVS import AtomicDEVS, CoupledDEVS from pypdevs.infinity import INFINITY from utils.get_timestamp import get_timestamp from utils.timed_phase_enum import TimedPhaseEnum class SLDPhase(TimedPhaseEnum): """ Steps in order, along with their timings """ IDLE = ('IDLE', INFINITY) # sorting is happening, conveyor with 3 checkpoints (c123) white/red/blue SORTING_C1_WHITE = ('SORTING_C1_WHITE', 5.97) SORTING_C2_RED = ('SORTING_C2_RED', 1.467) SORTING_C3_BLUE = ('SORTING_C3_BLUE', 1.501) # eject to output bay EJECT = ('EJECT', 0.300) # 'delay' until pusher retracted and wp marked as 'sorted' SORTED = ('SORTED', 0.463) @dataclass class SLDState: delta_t: float = INFINITY phase: SLDPhase = SLDPhase.IDLE workpiece: Workpiece | None = None status_requested: bool = True # whether status is requested -> output status immediately last_sorted_color: WorkpieceColor | None = None # color of the wp we just sorted visual_update_pending: bool = False class SLDConveyor(AtomicDEVS): """ SLD Sorting Line with color Detection, which sorts the workpiece in the appropriate area """ def __init__(self, name: str): super(SLDConveyor, self).__init__(name) self.inp = self.addInPort("inp") self.mqtt_in = self.addInPort("mqtt_in") # to receive refresh status request # Out ports to holding bays for all colors self.out_white = self.addOutPort("out_white") self.out_red = self.addOutPort("out_red") self.out_blue = self.addOutPort("out_blue") self.mqtt_out = self.addOutPort("mqtt_out") self.state = SLDState() def change_phase(self, new_phase: SLDPhase): """ Wrapper for changing the phase and time associated with it, helps with logging """ self.state.phase = new_phase self.state.delta_t = new_phase.timing logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}") if new_phase == SLDPhase.EJECT: self.state.last_sorted_color = self.state.workpiece.color self.state.visual_update_pending = True def get_visual_update_data(self) -> MqttMessage: """ Get visual update data for the animation, contains the action taken and the duration of that action left """ message = MqttMessage() message.topic = "visualization/sld" duration = self.state.delta_t if duration == INFINITY: duration = None message.payload = { "action": self.state.phase.value, "duration": duration, # should be equal to the timing of the phase "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None, "last_sorted_color": self.state.last_sorted_color.value if self.state.last_sorted_color else None, } return message def _is_active(self): """ Whether the SLDConveyor is in an active phase (!= IDLE) """ return self.state.phase != SLDPhase.IDLE def get_status_message(self, use_next_phase=True) -> MqttMessage: phase = self.state.phase.next() if use_next_phase else self.state.phase active = 0 if phase == SLDPhase.IDLE else 1 code = 2 if active == 1 else 1 message = MqttMessage() message.topic = "f/i/state/sld" message.payload = { "active": active, "code": code, "description": '', "station": 'sld', "ts": get_timestamp(), } return message def get_sld_sorted_ack(self) -> MqttMessage: """ Get the MQTT message which denotes that this SLD has sorted a workpiece, and it is ready for pick-up """ message = MqttMessage() message.topic = "fl/sld/ack" message.payload = { "code": 2, "colorValue": 777, # TODO: figure out how to determine "ts": get_timestamp(), "type": self.state.last_sorted_color.value } return message def extTransition(self, inputs): self.state.delta_t -= self.elapsed if self.mqtt_in in inputs: # Handle MQTT input, which is used for visual update requests mqtt_message = inputs[self.mqtt_in][0] if mqtt_message.topic == "simulation/ctrl/all" and mqtt_message.payload.get("action") == "refresh": self.state.status_requested = True self.state.visual_update_pending = True return self.state elif not self.state.phase == SLDPhase.IDLE: logger.error(f"{type(self).__name__} '{self.name}' received input while not expecting: {inputs}") return self.state elif self.inp in inputs: wp: Workpiece = inputs[self.inp][0] self.state.workpiece = wp self.change_phase(SLDPhase.SORTING_C1_WHITE) self.state.status_requested = True # request status message return self.state # important, return state def timeAdvance(self): if self.state.visual_update_pending or self.state.status_requested: return 0.0 return self.state.delta_t def outputFnc(self): if self.state.visual_update_pending: return {self.mqtt_out: [self.get_visual_update_data()]} if self.state.status_requested: return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]} # output status if self.state.phase == SLDPhase.EJECT: # Output depending on color output_mapping = { WorkpieceColor.WHITE: self.out_white, WorkpieceColor.RED: self.out_red, WorkpieceColor.BLUE: self.out_blue, } target = self.state.workpiece.color if target in output_mapping: return {output_mapping[target]: [self.state.workpiece]} else: logger.error(f"{type(self).__name__} '{self.name}' unknown sorting target: {target}") if self.state.phase == SLDPhase.SORTED: return {self.mqtt_out: [self.get_status_message(), self.get_sld_sorted_ack()]} return {} def intTransition(self): if self.state.visual_update_pending: self.state.visual_update_pending = False return self.state if self.state.status_requested: self.state.status_requested = False # reset return self.state # EJECT workpiece when its color matches the checkpoint for the target if self.state.workpiece: if self.state.workpiece.color == WorkpieceColor.WHITE and self.state.phase == SLDPhase.SORTING_C1_WHITE: self.change_phase(SLDPhase.EJECT) return self.state elif self.state.workpiece.color == WorkpieceColor.RED and self.state.phase == SLDPhase.SORTING_C2_RED: self.change_phase(SLDPhase.EJECT) return self.state elif self.state.workpiece.color == WorkpieceColor.BLUE and self.state.phase == SLDPhase.SORTING_C3_BLUE: self.change_phase(SLDPhase.EJECT) # this one is redundant, the next phase is already EJECT return self.state elif self.state.phase == SLDPhase.EJECT: self.state.workpiece = None # remove the workpiece we just output # Transition to the next phase self.change_phase(self.state.phase.next()) return self.state class SLD(CoupledDEVS): """ SLD: Sorting Line with Color detection A Coupled model which combines the sorting line conveyor with 3 colored output bays """ def __init__(self, name: str): super(SLD, self).__init__(name) # Components self.sld_conveyor: SLDConveyor = self.addSubModel(SLDConveyor(f"{name}_conveyor")) self.white_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_white_bay", "WHITE")) self.red_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_red_bay", "RED")) self.blue_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_blue_bay", "BLUE")) # Own ports self.inp = self.addInPort("inp") # SLD conveyor input self.mqtt_in = self.addInPort("mqtt_in") # for refresh requests self.white_in = self.addInPort("white_in") # vgr requesting pickup self.red_in = self.addInPort("red_in") # vgr requesting pickup self.blue_in = self.addInPort("blue_in") # vgr requesting pickup self.white_out = self.addOutPort("white_out") self.red_out = self.addOutPort("red_out") self.blue_out = self.addOutPort("blue_out") self.mqtt_out = self.addOutPort("mqtt_out") # Connect internal ports self.connectPorts(self.inp, self.sld_conveyor.inp) self.connectPorts(self.sld_conveyor.out_white, self.white_bay.sld_in) self.connectPorts(self.sld_conveyor.out_red, self.red_bay.sld_in) self.connectPorts(self.sld_conveyor.out_blue, self.blue_bay.sld_in) self.connectPorts(self.white_in, self.white_bay.vgr_in) self.connectPorts(self.red_in, self.red_bay.vgr_in) self.connectPorts(self.blue_in, self.blue_bay.vgr_in) self.connectPorts(self.white_bay.vgr_out, self.white_out) self.connectPorts(self.red_bay.vgr_out, self.red_out) self.connectPorts(self.blue_bay.vgr_out, self.blue_out) # Connect MQTT ports self.connectPorts(self.mqtt_in, self.sld_conveyor.mqtt_in) self.connectPorts(self.mqtt_in, self.white_bay.mqtt_in) self.connectPorts(self.mqtt_in, self.red_bay.mqtt_in) self.connectPorts(self.mqtt_in, self.blue_bay.mqtt_in) self.connectPorts(self.sld_conveyor.mqtt_out, self.mqtt_out) self.connectPorts(self.white_bay.mqtt_out, self.mqtt_out) self.connectPorts(self.red_bay.mqtt_out, self.mqtt_out) self.connectPorts(self.blue_bay.mqtt_out, self.mqtt_out)