123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- from data_models.mqtt_message import MqttMessage
- from pypdevs.DEVS import AtomicDEVS
- from pypdevs.infinity import INFINITY
- from dataclasses import dataclass
- from loguru import logger
- from data_models.workpiece import Workpiece
- from utils.timed_phase_enum import TimedPhaseEnum
- class ConveyorPhase(TimedPhaseEnum):
- """ Steps in order, along with their timings """
- IDLE = ('IDLE', INFINITY)
- RUNNING = ('RUNNING', 2.368)
- @dataclass
- class ConveyorState:
- delta_t: float = INFINITY
- phase: ConveyorPhase = ConveyorPhase.IDLE
- workpiece: Workpiece | None = None
- visual_update_pending: bool = False
- class SimpleConveyor(AtomicDEVS):
- """ A simple conveyor belt which moves a workpiece across its belt and detects when it has reached the end """
- def __init__(self, name: str):
- super(SimpleConveyor, self).__init__(name)
- self.inp = self.addInPort("inp")
- self.out = self.addOutPort("out")
- self.mqtt_in = self.addInPort("mqtt_in") # to receive visual update requests
- self.mqtt_out = self.addOutPort("mqtt_out")
- self.state = ConveyorState()
- def change_phase(self, new_phase: ConveyorPhase):
- """ Wrapper for changing the phase and time associated with it, helps with logging """
- self.state.phase = new_phase
- self.state.delta_t = new_phase.timing
- logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
- self.state.visual_update_pending = True
- def get_visual_update_data(self) -> MqttMessage:
- """ Get visual update data for the animation, contains the action taken and the duration of that action left """
- message = MqttMessage()
- message.topic = "visualization/conveyor"
- duration = self.state.delta_t
- if duration == INFINITY: duration = None
- message.payload = {
- "action": self.state.phase.value,
- "duration": duration, # should be equal to the timing of the phase
- "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
- }
- return message
- def extTransition(self, inputs):
- if self.mqtt_in in inputs:
- msg = inputs[self.mqtt_in][0]
- if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
- self.state.visual_update_pending = True
- return self.state
- if self.inp in inputs:
- new_workpiece = inputs[self.inp][0]
- logger.trace(f"{type(self).__name__} '{self.name}' received: {new_workpiece}")
- self.state.workpiece = new_workpiece
- self.change_phase(ConveyorPhase.RUNNING)
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.visual_update_pending:
- return 0.0
- return self.state.delta_t
- def outputFnc(self):
- if self.state.visual_update_pending:
- return {self.mqtt_out: [self.get_visual_update_data()]}
- logger.trace(f"{type(self).__name__} '{self.name}' outputs: {self.state.workpiece}")
- return {self.out: [self.state.workpiece]}
- def intTransition(self):
- if self.state.visual_update_pending:
- self.state.visual_update_pending = False
- return self.state
- # We have just output the workpiece, so mark it as empty again
- self.state.workpiece = None
- self.change_phase(ConveyorPhase.IDLE)
- return self.state
|