1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- from dataclasses import dataclass
- from loguru import logger
- from data_models.mqtt_message import MqttMessage
- from data_models.workpiece import Workpiece
- from pypdevs.DEVS import AtomicDEVS
- from pypdevs.infinity import INFINITY
- @dataclass
- class HoldingBayState:
- color: str # 'WHITE' | 'RED' | 'BLUE'
- workpiece: Workpiece | None = None
- output_requested = False # if true, output workpiece
- visual_update_pending: bool = False
- class HoldingBay(AtomicDEVS):
- """ Holding bay which holds a single (color) workpiece, part of the SLD. Handles pickup interaction with VGR """
- def __init__(self, name: str, color: str):
- # name needs to be unique to refer to it
- super(HoldingBay, self).__init__(name)
- self.sld_in = self.addInPort("sld_in") # input of workpiece via SLD
- self.vgr_in = self.addInPort("vgr_in") # input for VGR requesting workpiece
- self.mqtt_in = self.addInPort("mqtt_in") # input for MQTT messages (e.g. visual refresh request)
- self.vgr_out = self.addOutPort("vgr_out") # output for workpiece to the VGR
- self.mqtt_out = self.addOutPort("mqtt_out") # output for visuals
- self.state = HoldingBayState(color)
- def get_visual_update_data(self) -> MqttMessage:
- """ Get visual update data for the animation, contains the action taken and the duration of that action left """
- message = MqttMessage()
- message.topic = f"visualization/sld_bay/{self.state.color}"
- message.payload = {
- "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
- }
- return message
- def extTransition(self, inputs):
- if self.mqtt_in in inputs:
- msg = inputs[self.mqtt_in][0]
- if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
- self.state.visual_update_pending = True
- return self.state
- if self.sld_in in inputs:
- self.state.visual_update_pending = True # every transition requires a new visual update
- if self.state.workpiece is not None:
- logger.error(f"{type(self).__name__} '{self.name}' received workpiece while already holding one!")
- return self.state
- wp = inputs[self.sld_in][0]
- self.state.workpiece = wp
- logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
- if self.vgr_in in inputs:
- self.state.visual_update_pending = True # every transition requires a new visual update
- if not self.state.workpiece:
- logger.trace(
- f"{type(self).__name__} '{self.name}' received VGR pickup request whilst not having workpiece!")
- else:
- self.state.output_requested = True
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.output_requested or self.state.visual_update_pending:
- return 0.0
- return INFINITY
- def outputFnc(self):
- if self.state.output_requested:
- return {self.vgr_out: [self.state.workpiece]}
- if self.state.visual_update_pending:
- return {self.mqtt_out: [self.get_visual_update_data()]}
- return {}
- def intTransition(self):
- # ORDER IS IMPORTANT HERE
- if self.state.output_requested:
- self.state.output_requested = False
- self.state.workpiece = None # We just output a workpiece, so remove it from our state
- elif self.state.visual_update_pending:
- self.state.visual_update_pending = False
- return self.state
- return self.state
|