| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- from data_models.mqtt_message import MqttMessage
- from pypdevs.DEVS import AtomicDEVS
- from pypdevs.infinity import INFINITY
- from dataclasses import dataclass, field
- from loguru import logger
- from data_models.workpiece import Workpiece, WorkpieceColor
- from utils.get_timestamp import get_timestamp
- from utils.timed_phase_enum import TimedPhaseEnum
- class InputRoutine(TimedPhaseEnum):
- """
- Routine when a new workpiece enters the factory
- (Steps along with their timings)
- """
- IDLE = ('IDLE', INFINITY)
- NFC_READING = ('NFC_READING', 1.367)
- IDLE_WAIT = ('IDLE_WAIT', INFINITY) # wait for VGR to move to color sensor
- COLOR_READING = ('COLOR_READING', 0.100)
- IDLE_WAIT_2 = ('IDLE_WAIT_2', INFINITY) # wait for VGR to move to NFC reader
- NFC_WRITING = ('NFC_WRITING', 7.603) # yes, this takes a while
- class OutputRoutine(TimedPhaseEnum):
- """
- Routine before a workpiece gets delivered to the output
- (Steps along with their timings)
- """
- IDLE = ('IDLE', INFINITY)
- NFC_WRITING = ('NFC_WRITING', 3.902)
- @dataclass
- class ReaderStationState:
- phase: InputRoutine | OutputRoutine = InputRoutine.IDLE
- delta_t: float = INFINITY
- workpiece: Workpiece | None = None
- color_read: WorkpieceColor = WorkpieceColor.NONE # color read by the color sensor
- visual_update_pending: bool = False
- class ReaderStation(AtomicDEVS):
- """ Station which holds both the NFC reader/writer and the color sensor to detect workpiece color """
- def __init__(self, name: str):
- # name needs to be unique to refer to it
- super(ReaderStation, self).__init__(name)
- self.nfc_in = self.addInPort("nfc_in")
- self.nfc_out = self.addOutPort("nfc_out")
- self.color_in = self.addInPort("color_in")
- self.color_out = self.addOutPort("color_out")
- self.mqtt_in = self.addInPort("mqtt_in")
- self.mqtt_out = self.addOutPort("mqtt_out")
- self.state = ReaderStationState()
- def change_phase(self, new_phase: InputRoutine | OutputRoutine):
- """ Wrapper for changing the phase and time associated with it, helps with logging """
- self.state.phase = new_phase
- self.state.delta_t = new_phase.timing
- logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
- self.state.visual_update_pending = True
- def get_visual_update_data(self) -> MqttMessage:
- """ Get visual update data for the animation, contains the action taken and the duration of that action left """
- message = MqttMessage()
- message.topic = "visualization/nfc"
- duration = self.state.delta_t
- if duration == INFINITY: duration = None
- message.payload = {
- "action": self.state.phase.value,
- "duration": duration, # should be equal to the timing of the phase
- "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
- }
- return message
- def get_nfc_message(self) -> MqttMessage:
- """ Get the nfc message for the workpiece """
- message = MqttMessage()
- message.topic = "f/i/nfc/ds"
- message.payload = {
- "history": [], # TODO: history of NFC reading
- "ts": get_timestamp(),
- "workpiece": {
- "id": self.state.workpiece.id,
- "state": self.state.workpiece.state,
- "type": self.state.color_read.value,
- }
- }
- return message
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.mqtt_in in inputs:
- msg = inputs[self.mqtt_in][0]
- if msg.topic == "simulation/ctrl/nfc":
- print(msg)
- if self.state.workpiece and msg.payload["action"] == "id_update":
- self.state.workpiece.id = msg.payload["workpiece"]["id"]
- elif self.state.workpiece and msg.payload["action"] == "color_update":
- self.state.workpiece.color = WorkpieceColor[msg.payload["workpiece"]["type"]]
- if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
- self.state.visual_update_pending = True
- return self.state
- elif not self.state.phase in [InputRoutine.IDLE, InputRoutine.IDLE_WAIT, InputRoutine.IDLE_WAIT_2, OutputRoutine.IDLE]:
- logger.error(f"{type(self).__name__} '{self.name}' received workpiece while not Idle!")
- if self.nfc_in in inputs:
- wp: Workpiece = inputs[self.nfc_in][0]
- logger.trace(f"{type(self).__name__} '{self.name}' NFC received: {wp}")
- if wp.state == "BAKED":
- self.change_phase(OutputRoutine.NFC_WRITING)
- self.state.color_read = wp.color
- wp.state = "PROCESSED" # TODO: check if this logic is correct
- elif self.state.phase in [InputRoutine.IDLE, OutputRoutine.IDLE]:
- self.change_phase(InputRoutine.NFC_READING) # new, read
- elif self.state.phase == InputRoutine.IDLE_WAIT_2:
- self.change_phase(InputRoutine.NFC_WRITING) # seen, write nfc
- self.state.workpiece = wp
- if self.color_in in inputs:
- wp: Workpiece = inputs[self.color_in][0]
- logger.trace(f"{type(self).__name__} '{self.name}' COLOR_SENSOR received: {wp}")
- self.change_phase(InputRoutine.COLOR_READING)
- self.state.workpiece = wp
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.visual_update_pending:
- return 0.0
- return self.state.delta_t
- def outputFnc(self):
- if self.state.visual_update_pending:
- return {self.mqtt_out: [self.get_visual_update_data()]}
- if self.state.phase in [InputRoutine.NFC_READING, InputRoutine.NFC_WRITING, OutputRoutine.NFC_WRITING]:
- return {self.nfc_out: [self.state.workpiece], self.mqtt_out: [self.get_nfc_message()]}
- elif self.state.phase == InputRoutine.COLOR_READING:
- return {self.color_out: [self.state.workpiece]}
- return {}
- def intTransition(self):
- if self.state.visual_update_pending:
- self.state.visual_update_pending = False
- return self.state
- if self.state.phase == InputRoutine.COLOR_READING:
- self.state.color_read = self.state.workpiece.color
- if self.state.phase in [InputRoutine.NFC_WRITING, OutputRoutine.NFC_WRITING]:
- self.state.color_read = WorkpieceColor.NONE # reset for next input
- # Transition to next phase (idle phase)
- self.state.workpiece = None # We have just output the workpiece, so mark it as empty again
- self.change_phase(self.state.phase.next())
- return self.state
|