123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- from data_models.mqtt_message import MqttMessage
- from pypdevs.DEVS import AtomicDEVS
- from pypdevs.infinity import INFINITY
- from dataclasses import dataclass
- from loguru import logger
- from data_models.workpiece import Workpiece
- from utils.get_timestamp import get_timestamp
- from utils.timed_phase_enum import TimedPhaseEnum
- class MPOTableSawPhase(TimedPhaseEnum):
- """ Steps in order, along with their timings """
- IDLE = ('IDLE', INFINITY)
- ROTATE_TO_FRONT = ('ROTATE_TO_FRONT', 2.697) # rotate table to face the gripper
- AWAIT_WP = ('AWAIT_WP', INFINITY) # await workpiece delivery by gripper
- ROTATE_TO_BACK = ('ROTATE_TO_BACK', 1.791) # rotate table to face saw (at back)
- SAW_RIGHT = ('SAW_RIGHT', 3.5)
- SAW_LEFT = ('SAW_LEFT', 3.5)
- ROTATE_TO_SIDE = ('ROTATE_TO_SIDE', 0.904) # turn table towards conveyor (to the right side)
- EJECT_WP = ('EJECT_WP', 0.630) # eject/push workpiece on conveyor
- @dataclass
- class MPOTableSawState:
- delta_t: float = INFINITY
- phase: MPOTableSawPhase = MPOTableSawPhase.IDLE
- workpiece: Workpiece | None = None
- status_requested: bool = True # if true, publish status
- visual_update_pending: bool = False
- def get_mpo_produced_msg() -> MqttMessage:
- """ Get the 'MPO_PRODUCED' mqtt message """
- message = MqttMessage()
- message.topic = "fl/mpo/ack"
- message.payload = {
- "code": 2,
- "ts": get_timestamp(),
- }
- return message
- class MPOTableSaw(AtomicDEVS):
- """ MPO rotating table and table saw """
- def __init__(self, name: str):
- super(MPOTableSaw, self).__init__(name)
- self.gripper_in = self.addInPort("gripper_in") # vacuum gripper which brings us workpiece
- self.gripper_out = self.addOutPort("gripper_out") # to acknowledge table turning to the gripper
- self.conveyor_out = self.addOutPort("conveyor_out") # conveyor belt output after we're done
- self.mqtt_in = self.addInPort("mqtt_in")
- self.mqtt_out = self.addOutPort("mqtt_out")
- self.state = MPOTableSawState()
- def change_phase(self, new_phase: MPOTableSawPhase):
- """ Wrapper for changing the phase and time associated with it, helps with logging """
- self.state.phase = new_phase
- self.state.delta_t = new_phase.timing
- logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
- self.state.visual_update_pending = True
- def get_visual_update_data(self) -> MqttMessage:
- """ Get visual update data for the animation, contains the action taken and the duration of that action left """
- message = MqttMessage()
- message.topic = "visualization/mpo/table_saw"
- duration = self.state.delta_t
- if duration == INFINITY: duration = None
- message.payload = {
- "action": self.state.phase.value,
- "duration": duration, # should be equal to the timing of the phase
- "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
- }
- return message
- def get_status_message(self, use_next_phase=True) -> MqttMessage:
- phase = self.state.phase.next() if use_next_phase else self.state.phase
- active = 0 if (phase == MPOTableSawPhase.IDLE) else 1
- code = 2 if active == 1 else 1
- message = MqttMessage()
- message.topic = "f/i/state/mpo"
- message.payload = {
- "active": active,
- "code": code,
- "description": '',
- "station": 'mpo',
- "ts": get_timestamp(),
- }
- return message
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.mqtt_in in inputs:
- msg = inputs[self.mqtt_in][0]
- if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
- self.state.status_requested = True
- self.state.visual_update_pending = True
- return self.state
- elif not (self.state.phase == MPOTableSawPhase.IDLE or self.state.phase == MPOTableSawPhase.AWAIT_WP):
- logger.error(f"{type(self).__name__} '{self.name}' received input while not expecting: {inputs}")
- return self.state
- if self.gripper_in in inputs:
- # 2 cases: instruction to turn, or workpiece
- if self.state.phase == MPOTableSawPhase.IDLE:
- self.change_phase(MPOTableSawPhase.ROTATE_TO_FRONT)
- elif self.state.phase == MPOTableSawPhase.AWAIT_WP:
- wp = inputs[self.gripper_in][0]
- self.state.workpiece = wp
- self.change_phase(MPOTableSawPhase.ROTATE_TO_BACK)
- else:
- logger.error(f"{type(self).__name__} '{self.name}' received gripper input while not expecting: {inputs}")
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.visual_update_pending or self.state.status_requested:
- return 0.0
- return self.state.delta_t
- def outputFnc(self):
- if self.state.visual_update_pending:
- return {self.mqtt_out: [self.get_visual_update_data()]}
- if self.state.status_requested:
- return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]}
- if self.state.phase == MPOTableSawPhase.ROTATE_TO_FRONT:
- return {self.gripper_out: {}, self.mqtt_out: [self.get_status_message()]} # acknowledge rotation
- elif self.state.phase == MPOTableSawPhase.EJECT_WP:
- # output wp, and publish 'mpo produced' message to mqtt
- return {self.conveyor_out: [self.state.workpiece], self.mqtt_out: [get_mpo_produced_msg(), self.get_status_message()]}
- return {self.mqtt_out: [self.get_status_message()]}
- def intTransition(self):
- if self.state.visual_update_pending:
- self.state.visual_update_pending = False
- return self.state
- if self.state.status_requested:
- self.state.status_requested = False
- return self.state
- if self.state.phase == MPOTableSawPhase.EJECT_WP:
- self.state.workpiece = None # remove the workpiece we just output to the conveyor
- # Transition to next state
- self.change_phase(self.state.phase.next())
- return self.state
|