from data_models.mqtt_message import MqttMessage from pypdevs.DEVS import AtomicDEVS from pypdevs.infinity import INFINITY from dataclasses import dataclass from loguru import logger from data_models.workpiece import Workpiece, WorkpieceColor from utils.get_timestamp import get_timestamp from utils.timed_phase_enum import TimedPhaseEnum class DsiPhase(TimedPhaseEnum): """ Steps in order, along with their timings """ IDLE = ('IDLE', INFINITY) WP_RECEIVED = ('WP_RECEIVED', 0.2) # delay before sending a message AWAIT_PICKUP = ('AWAIT_PICKUP', INFINITY) # await pickup by vgr EXPORT_WP = ('EXPORT_WP', 0.0) # immediately handover wp to vgr AWAIT_VGR_CONFIRMATION = ('AWAIT_VGR_CONFIRMATION', INFINITY) # await for fl/vgr/do confirmation @dataclass class InputBayState: delta_t: float = INFINITY phase: DsiPhase = DsiPhase.IDLE workpiece: Workpiece | None = None status_requested = True # if true, publish status visual_update_pending: bool = False class DSI(AtomicDEVS): """ Input bay which holds a single workpiece (Delivery Station Input) """ def __init__(self, name: str): # name needs to be unique to refer to it super(DSI, self).__init__(name) self.inp = self.addInPort("inp") self.out = self.addOutPort("out") self.mqtt_in = self.addInPort("mqtt_in") self.mqtt_out = self.addOutPort("mqtt_out") self.vgr_in = self.addInPort("vgr_in") # input for vgr requesting workpiece self.state = InputBayState() def change_phase(self, new_phase: DsiPhase): """ Wrapper for changing the phase and time associated with it, helps with logging """ self.state.phase = new_phase self.state.delta_t = new_phase.timing logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}") self.state.visual_update_pending = True def get_visual_update_data(self) -> MqttMessage: """ Get visual update data for the animation, contains the action taken and the duration of that action left """ message = MqttMessage() message.topic = "visualization/dsi" duration = self.state.delta_t if duration == INFINITY: duration = None message.payload = { "action": self.state.phase.value, "duration": duration, # should be equal to the timing of the phase "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None, } return message def get_status_message(self, use_next_phase=True) -> MqttMessage: """ Get the status message for the input bay """ phase = self.state.phase.next() if use_next_phase else self.state.phase active = 0 if (phase == DsiPhase.IDLE or phase == DsiPhase.AWAIT_VGR_CONFIRMATION) else 1 code = 1 if (phase == DsiPhase.IDLE) else 0 message = MqttMessage() message.topic = "f/i/state/dsi" message.payload = { "active": active, "code": code, "description": '', "station": 'dsi', "ts": get_timestamp(), } return message def extTransition(self, inputs): self.state.delta_t -= self.elapsed if self.inp in inputs: if not self.state.phase == DsiPhase.IDLE: logger.error(f"{type(self).__name__} '{self.name}' received workpiece while not Idle!") return self.state wp = inputs[self.inp][0] logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}") self.change_phase(DsiPhase.WP_RECEIVED) self.state.workpiece = wp if self.mqtt_in in inputs: msg = inputs[self.mqtt_in][0] # TODO: check if multiple messages are possible if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh": self.state.status_requested = True self.state.visual_update_pending = True return self.state elif msg.topic == "simulation/ctrl/dsi" and msg.payload['action'] == "workpiece_arrived": self.change_phase(DsiPhase.WP_RECEIVED) self.state.workpiece = Workpiece("?", WorkpieceColor.NONE) self.state.status_requested = True elif msg.topic == "fl/vgr/do" and msg.payload['code'] == 1: if self.state.phase == DsiPhase.AWAIT_VGR_CONFIRMATION: self.change_phase(DsiPhase.IDLE) self.state.status_requested = True # publish idle update else: logger.trace(f"{type(self).__name__} '{self.name}' received VGR confirmation whilst not expecting it") if self.vgr_in in inputs: if not self.state.phase == DsiPhase.AWAIT_PICKUP: logger.trace(f"{type(self).__name__} '{self.name}' received VGR pickup request whilst not expecting it") else: self.change_phase(DsiPhase.EXPORT_WP) return self.state # important, return state def timeAdvance(self): if self.state.visual_update_pending or self.state.status_requested: return 0.0 return self.state.delta_t def outputFnc(self): if self.state.visual_update_pending: return {self.mqtt_out: [self.get_visual_update_data()]} if self.state.status_requested or self.state.phase == DsiPhase.WP_RECEIVED: return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]} if self.state.phase == DsiPhase.EXPORT_WP: return {self.out: [self.state.workpiece]} return {} def intTransition(self): if self.state.visual_update_pending: self.state.visual_update_pending = False return self.state if self.state.status_requested: self.state.status_requested = False # if requested, we've handled, so unset return self.state if self.state.phase == DsiPhase.EXPORT_WP: self.state.workpiece = None # We have just output the workpiece, so mark it as empty again # Transition to next state self.change_phase(self.state.phase.next()) return self.state