123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- from dataclasses import dataclass
- from loguru import logger
- from data_models.mqtt_message import MqttMessage
- from data_models.workpiece import Workpiece, WorkpieceColor
- from devs_models.sld_holding_bay import HoldingBay
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.infinity import INFINITY
- from utils.get_timestamp import get_timestamp
- from utils.timed_phase_enum import TimedPhaseEnum
- class SLDPhase(TimedPhaseEnum):
- """ Steps in order, along with their timings """
- IDLE = ('IDLE', INFINITY)
- # sorting is happening, conveyor with 3 checkpoints (c123) white/red/blue
- SORTING_C1_WHITE = ('SORTING_C1_WHITE', 5.97)
- SORTING_C2_RED = ('SORTING_C2_RED', 1.467)
- SORTING_C3_BLUE = ('SORTING_C3_BLUE', 1.501)
- # eject to output bay
- EJECT = ('EJECT', 0.300)
- # 'delay' until pusher retracted and wp marked as 'sorted'
- SORTED = ('SORTED', 0.463)
- @dataclass
- class SLDState:
- delta_t: float = INFINITY
- phase: SLDPhase = SLDPhase.IDLE
- workpiece: Workpiece | None = None
- status_requested: bool = True # whether status is requested -> output status immediately
- last_sorted_color: WorkpieceColor | None = None # color of the wp we just sorted
- visual_update_pending: bool = False
- class SLDConveyor(AtomicDEVS):
- """ SLD Sorting Line with color Detection, which sorts the workpiece in the appropriate area """
- def __init__(self, name: str):
- super(SLDConveyor, self).__init__(name)
- self.inp = self.addInPort("inp")
- self.mqtt_in = self.addInPort("mqtt_in") # to receive refresh status request
- # Out ports to holding bays for all colors
- self.out_white = self.addOutPort("out_white")
- self.out_red = self.addOutPort("out_red")
- self.out_blue = self.addOutPort("out_blue")
- self.mqtt_out = self.addOutPort("mqtt_out")
- self.state = SLDState()
- def change_phase(self, new_phase: SLDPhase):
- """ Wrapper for changing the phase and time associated with it, helps with logging """
- self.state.phase = new_phase
- self.state.delta_t = new_phase.timing
- logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
- if new_phase == SLDPhase.EJECT:
- self.state.last_sorted_color = self.state.workpiece.color
- self.state.visual_update_pending = True
- def get_visual_update_data(self) -> MqttMessage:
- """ Get visual update data for the animation, contains the action taken and the duration of that action left """
- message = MqttMessage()
- message.topic = "visualization/sld"
- duration = self.state.delta_t
- if duration == INFINITY: duration = None
- message.payload = {
- "action": self.state.phase.value,
- "duration": duration, # should be equal to the timing of the phase
- "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
- "last_sorted_color": self.state.last_sorted_color.value if self.state.last_sorted_color else None,
- }
- return message
- def _is_active(self):
- """ Whether the SLDConveyor is in an active phase (!= IDLE) """
- return self.state.phase != SLDPhase.IDLE
- def get_status_message(self, use_next_phase=True) -> MqttMessage:
- phase = self.state.phase.next() if use_next_phase else self.state.phase
- active = 0 if phase == SLDPhase.IDLE else 1
- code = 2 if active == 1 else 1
- message = MqttMessage()
- message.topic = "f/i/state/sld"
- message.payload = {
- "active": active,
- "code": code,
- "description": '',
- "station": 'sld',
- "ts": get_timestamp(),
- }
- return message
- def get_sld_sorted_ack(self) -> MqttMessage:
- """ Get the MQTT message which denotes that this SLD has sorted a workpiece, and it is ready for pick-up """
- message = MqttMessage()
- message.topic = "fl/sld/ack"
- message.payload = {
- "code": 2,
- "colorValue": 777, # TODO: figure out how to determine
- "ts": get_timestamp(),
- "type": self.state.last_sorted_color.value
- }
- return message
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.mqtt_in in inputs:
- # Handle MQTT input, which is used for visual update requests
- mqtt_message = inputs[self.mqtt_in][0]
- if mqtt_message.topic == "simulation/ctrl/all" and mqtt_message.payload.get("action") == "refresh":
- self.state.status_requested = True
- self.state.visual_update_pending = True
- return self.state
- elif not self.state.phase == SLDPhase.IDLE:
- logger.error(f"{type(self).__name__} '{self.name}' received input while not expecting: {inputs}")
- return self.state
- elif self.inp in inputs:
- wp: Workpiece = inputs[self.inp][0]
- self.state.workpiece = wp
- self.change_phase(SLDPhase.SORTING_C1_WHITE)
- self.state.status_requested = True # request status message
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.visual_update_pending or self.state.status_requested:
- return 0.0
- return self.state.delta_t
- def outputFnc(self):
- if self.state.visual_update_pending:
- return {self.mqtt_out: [self.get_visual_update_data()]}
- if self.state.status_requested:
- return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]} # output status
- if self.state.phase == SLDPhase.EJECT:
- # Output depending on color
- output_mapping = {
- WorkpieceColor.WHITE: self.out_white,
- WorkpieceColor.RED: self.out_red,
- WorkpieceColor.BLUE: self.out_blue,
- }
- target = self.state.workpiece.color
- if target in output_mapping:
- return {output_mapping[target]: [self.state.workpiece]}
- else:
- logger.error(f"{type(self).__name__} '{self.name}' unknown sorting target: {target}")
- if self.state.phase == SLDPhase.SORTED:
- return {self.mqtt_out: [self.get_status_message(), self.get_sld_sorted_ack()]}
- return {}
- def intTransition(self):
- if self.state.visual_update_pending:
- self.state.visual_update_pending = False
- return self.state
- if self.state.status_requested:
- self.state.status_requested = False # reset
- return self.state
- # EJECT workpiece when its color matches the checkpoint for the target
- if self.state.workpiece:
- if self.state.workpiece.color == WorkpieceColor.WHITE and self.state.phase == SLDPhase.SORTING_C1_WHITE:
- self.change_phase(SLDPhase.EJECT)
- return self.state
- elif self.state.workpiece.color == WorkpieceColor.RED and self.state.phase == SLDPhase.SORTING_C2_RED:
- self.change_phase(SLDPhase.EJECT)
- return self.state
- elif self.state.workpiece.color == WorkpieceColor.BLUE and self.state.phase == SLDPhase.SORTING_C3_BLUE:
- self.change_phase(SLDPhase.EJECT) # this one is redundant, the next phase is already EJECT
- return self.state
- elif self.state.phase == SLDPhase.EJECT:
- self.state.workpiece = None # remove the workpiece we just output
- # Transition to the next phase
- self.change_phase(self.state.phase.next())
- return self.state
- class SLD(CoupledDEVS):
- """ SLD: Sorting Line with Color detection
- A Coupled model which combines the sorting line conveyor with 3 colored output bays
- """
- def __init__(self, name: str):
- super(SLD, self).__init__(name)
- # Components
- self.sld_conveyor: SLDConveyor = self.addSubModel(SLDConveyor(f"{name}_conveyor"))
- self.white_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_white_bay", "WHITE"))
- self.red_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_red_bay", "RED"))
- self.blue_bay: HoldingBay = self.addSubModel(HoldingBay(f"{name}_blue_bay", "BLUE"))
- # Own ports
- self.inp = self.addInPort("inp") # SLD conveyor input
- self.mqtt_in = self.addInPort("mqtt_in") # for refresh requests
- self.white_in = self.addInPort("white_in") # vgr requesting pickup
- self.red_in = self.addInPort("red_in") # vgr requesting pickup
- self.blue_in = self.addInPort("blue_in") # vgr requesting pickup
- self.white_out = self.addOutPort("white_out")
- self.red_out = self.addOutPort("red_out")
- self.blue_out = self.addOutPort("blue_out")
- self.mqtt_out = self.addOutPort("mqtt_out")
- # Connect internal ports
- self.connectPorts(self.inp, self.sld_conveyor.inp)
- self.connectPorts(self.sld_conveyor.out_white, self.white_bay.sld_in)
- self.connectPorts(self.sld_conveyor.out_red, self.red_bay.sld_in)
- self.connectPorts(self.sld_conveyor.out_blue, self.blue_bay.sld_in)
- self.connectPorts(self.white_in, self.white_bay.vgr_in)
- self.connectPorts(self.red_in, self.red_bay.vgr_in)
- self.connectPorts(self.blue_in, self.blue_bay.vgr_in)
- self.connectPorts(self.white_bay.vgr_out, self.white_out)
- self.connectPorts(self.red_bay.vgr_out, self.red_out)
- self.connectPorts(self.blue_bay.vgr_out, self.blue_out)
- # Connect MQTT ports
- self.connectPorts(self.mqtt_in, self.sld_conveyor.mqtt_in)
- self.connectPorts(self.mqtt_in, self.white_bay.mqtt_in)
- self.connectPorts(self.mqtt_in, self.red_bay.mqtt_in)
- self.connectPorts(self.mqtt_in, self.blue_bay.mqtt_in)
- self.connectPorts(self.sld_conveyor.mqtt_out, self.mqtt_out)
- self.connectPorts(self.white_bay.mqtt_out, self.mqtt_out)
- self.connectPorts(self.red_bay.mqtt_out, self.mqtt_out)
- self.connectPorts(self.blue_bay.mqtt_out, self.mqtt_out)
|