123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- from dataclasses import dataclass, field
- from loguru import logger
- from data_models.crate import Crate
- from data_models.mqtt_message import MqttMessage
- from data_models.workpiece import Workpiece, WorkpieceColor
- from pypdevs.DEVS import AtomicDEVS
- from pypdevs.infinity import INFINITY
- from utils.get_timestamp import get_timestamp
- from utils.timed_phase_enum import TimedPhaseEnum
- class InputRoutine(TimedPhaseEnum):
- """ Routine when an input item is detected: Steps in order, along with their timings """
- IDLE = ('IDLE', INFINITY)
- # Move to DSI and retrieve input workpiece
- MOVE_TO_DSI = ('MOVE_TO_DSI', 7.0525)
- AWAIT_WORKPIECE = ('AWAIT_WORKPIECE', INFINITY) # wait for dsi to hand over workpiece
- MOVE_WP_UP_FROM_DSI = ('MOVE_WP_UP_FROM_DSI', 1.434) # move wp up from DSI
- # NFC reader
- MOVE_TO_NFC_READER = ('MOVE_TO_NFC_READER', 2.001)
- AWAIT_NFC_READ = ('AWAIT_NFC_READ', INFINITY)
- # Color Reader
- MOVE_TO_COLOR_SENSOR = ('MOVE_TO_COLOR_SENSOR', 2.468)
- AWAIT_COLOR_READ = ('AWAIT_COLOR_READ', INFINITY)
- # NFC write
- MOVE_TO_NFC_WRITER = ('MOVE_TO_NFC_WRITER', 1.97)
- AWAIT_NFC_WRITE = ('AWAIT_NFC_WRITE', INFINITY)
- WORKPIECE_RECEIVED = ('WORKPIECE_RECEIVED', 0.0) # immediately instruct HBW to retrieve crate
- # Move workpiece to HBW via Crate Transporter
- MOVE_TO_CT = ('MOVE_TO_CT', 9.438)
- AWAIT_CRATE = ('AWAIT CRATE', INFINITY)
- PLACE_WP_IN_CRATE = ('PLACE_WP_IN_CRATE', 1.201)
- MOVE_TO_START_POS = ('MOVE_TO_START_POS', 8.255)
- class OrderRoutine(TimedPhaseEnum):
- """ Routine when an order is received: Steps in order, along with their timings """
- IDLE = ('IDLE', INFINITY)
- MOVE_TO_CT = ('MOVE_TO_CT', 10.305)
- AWAIT_CRATE = ('AWAIT_CRATE', INFINITY)
- RETURN_EMPTY_CRATE = ('RETURN_EMPTY_CRATE', 6.77) # take the workpiece and return the empty crate
- MOVE_TO_MPO = ('MOVE_TO_MPO', 2.434)
- DROP_WP_MPO = ('DROP_WP_MPO', 6.536)
- MOVE_ARM_UP = ('MOVE_ARM_UP', 3.375)
- RETRACT_ARM = ('RETRACT_ARM', 5.7185)
- MOVE_TO_START_POS = ('MOVE_TO_START_POS', 5.579)
- class OutputRoutine(TimedPhaseEnum):
- """ SLD has sorted a workpiece, and we need to bring it to the output """
- IDLE = ('IDLE', INFINITY)
- MOVE_TO_SLD = 'MOVE_TO_SLD' # timing below
- PICKUP_WP_SLD = ('PICKUP_WP_SLD', 5.7355) # lower arm for pickup
- AWAIT_WP_SLD = ('AWAIT_WP_SLD', INFINITY)
- RAISE_WP = ('RAISE_WP', 6.0185)
- MOVE_TO_NFC = 'MOVE_TO_NFC' # timing below
- AWAIT_NFC = ('AWAIT_NFC', INFINITY)
- AWAIT_DSO_CLEAR = ('AWAIT_DSO_CLEAR', INFINITY)
- RAISE_WP_OFF_NFC = ('RAISE_WP_OFF_NFC', 4.635)
- DROP_WP_DSO = ('DROP_WP_DSO', 2.7675) # move to DSO and drop workpiece
- RAISE_ARM = ('RAISE_ARM', 2.067)
- MOVE_TO_START_POS = ('MOVE_TO_START_POS', 3.435)
- # Timings for the output routine, depending on color
- SLD_Timings: dict = {
- "WHITE": {OutputRoutine.MOVE_TO_SLD: 3.102, OutputRoutine.MOVE_TO_NFC: 4.202},
- "RED": {OutputRoutine.MOVE_TO_SLD: 2.501, OutputRoutine.MOVE_TO_NFC: 4.102},
- "BLUE": {OutputRoutine.MOVE_TO_SLD: 2.043, OutputRoutine.MOVE_TO_NFC: 4.077}
- }
- @dataclass
- class VgrState:
- phase: InputRoutine | OrderRoutine = InputRoutine.IDLE
- delta_t: float = INFINITY
- workpiece: Workpiece | None = None
- crate: Crate | None = None # temporary to give back to crate transporter
- immediate_message: MqttMessage | None = None # a message to publish immediately, if not none (e.g. Status message)
- target: str = "" # target, used for status message (-> 'hbw' or 'mpo' or 'dso')
- target_sld_bay_color: str = "NONE" # Target bay color when picking up from SLD
- dso_clear: bool = True # whether the dso (output bay) is clear or not
- visual_update_pending: bool = False
- mqtt_msg_queue: list[MqttMessage] = field(default_factory=list) # queue for incoming mqtt messages received while busy, handle later
- def get_hbw_instruction_message(workpiece: Workpiece, code: int) -> MqttMessage:
- """ Creates a mqtt message for the hbw to make room for a workpiece """
- msg = MqttMessage()
- msg.topic = "fl/vgr/do"
- msg.payload = {
- "code": code,
- "ts": get_timestamp(),
- "workpiece": {
- "id": str(workpiece.id),
- "state": workpiece.state,
- "type": workpiece.color.value,
- }
- }
- return msg
- def get_order_response_msg(wp_type: str, state: str="ORDERED"):
- """ Creates a mqtt message for acknowledging an order has been placed """
- # wp_type is one of 'BLUE', 'RED', 'WHITE'
- msg = MqttMessage()
- msg.topic = "f/i/order"
- msg.payload = {
- "state": state,
- "ts": get_timestamp(),
- "type": wp_type,
- }
- return msg
- def is_active(phase: InputRoutine | OrderRoutine) -> bool:
- """Is the VGR phase an active state?"""
- idle_states = [
- InputRoutine.IDLE,
- InputRoutine.AWAIT_CRATE,
- OrderRoutine.IDLE,
- OrderRoutine.AWAIT_CRATE,
- OutputRoutine.IDLE
- ]
- return phase not in idle_states
- class VacuumGripper(AtomicDEVS):
- def __init__(self, name: str):
- super(VacuumGripper, self).__init__(name)
- # IN PORTS
- self.dsi_in = self.addInPort("dsi_in") # input bay input
- self.ct_in = self.addInPort("ct_in") # crate transporter input
- self.nfc_in = self.addInPort("nfc_in") # nfc reader input
- self.color_sensor_in = self.addInPort("color_sensor_in") # color sensor input
- self.sld_in: dict = {
- "RED": self.addInPort("sld_red_in"),
- "BLUE": self.addInPort("sld_blue_in"),
- "WHITE": self.addInPort("sld_white_in")
- }
- self.mqtt_in = self.addInPort("mqtt_in")
- # OUT PORTS
- self.dsi_out = self.addOutPort("dsi_out") # input bay output
- self.ct_out = self.addOutPort("ct_out") # crate transporter output
- self.nfc_out = self.addOutPort("nfc_out")
- self.color_sensor_out = self.addOutPort("color_sensor_out")
- self.mpo_out = self.addOutPort("mpo_out")
- self.sld_out: dict = {
- "RED": self.addOutPort("sld_red_out"),
- "BLUE": self.addOutPort("sld_blue_out"),
- "WHITE": self.addOutPort("sld_white_out")
- }
- self.dso_out = self.addOutPort("dso_out")
- self.mqtt_out = self.addOutPort("mqtt_out")
- self.state = VgrState()
- self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message on startup
- def is_idle(self) -> bool:
- """ Check if the vacuum gripper is in an idle state """
- return self.state.phase in [InputRoutine.IDLE, OrderRoutine.IDLE, OutputRoutine.IDLE]
- def _is_dsi_receive_valid(self, item) -> bool:
- """ Checks if the item we received from DSI, and the combination with the current state is valid"""
- if not self.state.phase == InputRoutine.AWAIT_WORKPIECE:
- logger.error(f"{type(self).__name__} '{self.name}' received item whilst NOT Expecting: {item}")
- return False
- if not isinstance(item, Workpiece):
- logger.error(f"{type(self).__name__} '{self.name}' received item {item} is not a Workpiece:")
- return False
- if self.state.workpiece is not None:
- logger.error(
- f"{type(self).__name__} '{self.name}' received new item whilst still holding one, DROPPING {item}:")
- return False
- return True
- def _is_ct_receive_valid(self, item) -> bool:
- """ Checks if the item we received from CrateTransporter, and the combination with the current state is valid"""
- if not (self.state.phase == InputRoutine.AWAIT_CRATE or self.state.phase == OrderRoutine.AWAIT_CRATE):
- logger.error(f"{type(self).__name__} '{self.name}' received crate whilst NOT WAITING FOR CRATE: {item}")
- return False
- if not isinstance(item, Crate):
- logger.error(f"{type(self).__name__} '{self.name}' received item {item} is not a Crate:")
- return False
- if self.state.crate is not None:
- logger.error(
- f"{type(self).__name__} '{self.name}' received new crate whilst still holding one, DROPPING {item}:")
- return False
- return True
- def get_status_message(self, use_next_phase=True) -> MqttMessage:
- """ Get the status message for the vacuum gripper """
- # TODO: make codes more accurate to real factory
- phase = self.state.phase.next() if use_next_phase else self.state.phase
- active = 1 if is_active(phase) else 0
- code = 2 if (active == 1) else 1
- message = MqttMessage()
- message.topic = "f/i/state/vgr"
- message.payload = {
- "active": active,
- "code": code,
- "description": '',
- "station": 'vgr',
- "target": self.state.target,
- "ts": get_timestamp(),
- }
- return message
- def change_phase(self, new_phase: InputRoutine | OrderRoutine | OutputRoutine):
- """ Wrapper for changing the phase and associated timing, helps with logging """
- self.state.phase = new_phase
- self.state.delta_t = new_phase.timing
- logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
- self.state.visual_update_pending = True
- # Change target depending on the phase
- if isinstance(new_phase, InputRoutine):
- self.state.target = 'hbw'
- elif isinstance(new_phase, OrderRoutine):
- if new_phase == OrderRoutine.MOVE_TO_MPO:
- self.state.target = 'mpo'
- elif isinstance(new_phase, OutputRoutine):
- self.state.target = 'dso'
- def get_visual_update_data(self) -> MqttMessage:
- """ Get visual update data for the animation, contains the action taken and the duration of that action left """
- message = MqttMessage()
- message.topic = "visualization/vgr"
- duration = self.state.delta_t
- if duration == INFINITY: duration = None
- message.payload = {
- "action": self.state.phase.value,
- "duration": duration, # should be equal to the timing of the phase
- "crate": self.state.crate.to_dict() if self.state.crate else None,
- "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
- "target_sld": self.state.target_sld_bay_color, # extra info
- }
- return message
- def handle_mqtt_message(self, msg: MqttMessage) -> None:
- if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
- self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message
- self.state.visual_update_pending = True # send visual update
- return self.state
- elif msg.topic == "simulation/ctrl/nfc":
- if self.state.workpiece and msg.payload["action"] == "id_update":
- self.state.workpiece.id = msg.payload["workpiece"]["id"]
- elif self.state.workpiece and msg.payload["action"] == "color_update":
- self.state.workpiece.color = WorkpieceColor[msg.payload["workpiece"]["type"]]
- # Queue some messages for later processing, if we are not IDLE
- elif not self.is_idle() and msg.topic in ["f/i/state/dsi", "f/o/order", "fl/sld/ack"]:
- self.state.mqtt_msg_queue.append(msg)
- elif msg.topic == "f/i/state/dsi":
- if msg.payload['active'] == 1:
- self.change_phase(InputRoutine.MOVE_TO_DSI)
- self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message
- elif msg.topic == "f/o/order":
- # reply with f/i/order
- self.state.immediate_message = get_order_response_msg(msg.payload["type"])
- elif msg.topic == "f/i/order" and msg.payload["state"] == 'ORDERED':
- # start self and hbw to handle the order
- wp = Workpiece("", WorkpieceColor(msg.payload["type"]), "RAW")
- self.state.immediate_message = get_hbw_instruction_message(wp, code=3)
- self.change_phase(OrderRoutine.MOVE_TO_CT)
- elif msg.topic == "fl/sld/ack" and msg.payload["code"] == 2:
- # SLD sorted, so go pick up the workpiece from there
- self.state.target_sld_bay_color = msg.payload["type"]
- self.change_phase(OutputRoutine.MOVE_TO_SLD)
- self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message
- self.state.delta_t = SLD_Timings[self.state.target_sld_bay_color][OutputRoutine.MOVE_TO_SLD]
- elif msg.topic == "f/i/state/dso":
- self.state.dso_clear = (msg.payload["active"] == 0) # check if dso is clear or not
- if self.state.phase == OutputRoutine.AWAIT_DSO_CLEAR and self.state.dso_clear:
- self.change_phase(self.state.phase.next())
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.dsi_in in inputs:
- item = inputs[self.dsi_in][0]
- logger.trace(f"{type(self).__name__} '{self.name}' received: {item}")
- if self._is_dsi_receive_valid(item):
- self.change_phase(InputRoutine.MOVE_WP_UP_FROM_DSI)
- self.state.workpiece = item
- elif self.ct_in in inputs:
- crate = inputs[self.ct_in][0]
- logger.trace(f"{type(self).__name__} '{self.name}' received: {crate}")
- if self._is_ct_receive_valid(crate) and self.state.phase == InputRoutine.AWAIT_CRATE:
- self.state.crate = crate
- # put workpiece in the new crate
- self.change_phase(InputRoutine.PLACE_WP_IN_CRATE)
- self.state.crate.workpiece = self.state.workpiece
- self.state.workpiece = None
- elif self._is_ct_receive_valid(crate) and self.state.phase == OrderRoutine.AWAIT_CRATE:
- # move workpiece from crate to internal
- self.state.workpiece = crate.workpiece
- crate.workpiece = None
- self.state.crate = crate
- # Go and return the empty crate
- self.change_phase(OrderRoutine.RETURN_EMPTY_CRATE)
- elif self.nfc_in in inputs:
- wp = inputs[self.nfc_in][0]
- logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
- if not (self.state.phase in [InputRoutine.AWAIT_NFC_READ, InputRoutine.AWAIT_NFC_WRITE, OutputRoutine.AWAIT_NFC]):
- logger.error(f"{type(self).__name__} '{self.name}' received NFC IN while not expecting: {wp}")
- return self.state
- self.state.workpiece = wp
- self.change_phase(self.state.phase.next()) # change to next step
- if self.state.phase == OutputRoutine.AWAIT_DSO_CLEAR and self.state.dso_clear:
- self.change_phase(self.state.phase.next()) # if dso clear, don't need to spend time waiting
- elif self.color_sensor_in in inputs:
- wp = inputs[self.color_sensor_in][0]
- logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
- if self.state.phase != InputRoutine.AWAIT_COLOR_READ:
- logger.error(f"{type(self).__name__} '{self.name}' received COLOR IN while not expecting: {wp}")
- return self.state
- self.state.workpiece = wp
- self.change_phase(self.state.phase.next()) # change to next step
- elif self.mqtt_in in inputs:
- msg = inputs[self.mqtt_in][0]
- self.handle_mqtt_message(msg) # handle the mqtt message
- # if SLD input
- for color, sld_input in self.sld_in.items():
- if sld_input in inputs:
- if self.state.phase == OutputRoutine.AWAIT_WP_SLD:
- self.state.workpiece = inputs[sld_input][0]
- self.change_phase(OutputRoutine.RAISE_WP)
- self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message
- else:
- logger.error(f"{type(self).__name__} '{self.name}' received SLD input while not expecting")
- return self.state
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.visual_update_pending or self.state.immediate_message:
- return 0.0
- if self.state.mqtt_msg_queue and self.is_idle():
- return 0.0 # immediately handle the next mqtt message in the queue
- return self.state.delta_t
- def outputFnc(self):
- if self.state.visual_update_pending:
- return {self.mqtt_out: [self.get_visual_update_data()]}
- if self.state.immediate_message is not None:
- return {self.mqtt_out: [self.state.immediate_message]}
- elif self.state.phase == InputRoutine.MOVE_TO_DSI:
- return {self.dsi_out: [], self.mqtt_out: [self.get_status_message()]} # notify dsi that we ready to receive workpiece
- elif self.state.phase == InputRoutine.WORKPIECE_RECEIVED:
- hbw_command: MqttMessage = get_hbw_instruction_message(self.state.workpiece, code=1)
- return {self.mqtt_out: [hbw_command, self.get_status_message()]}
- elif self.state.phase == InputRoutine.PLACE_WP_IN_CRATE or self.state.phase == OrderRoutine.RETURN_EMPTY_CRATE:
- return {self.ct_out: [self.state.crate], self.mqtt_out: [self.get_status_message()]}
- elif self.state.phase == OrderRoutine.DROP_WP_MPO:
- return {self.mpo_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
- elif self.state.phase in [InputRoutine.MOVE_TO_NFC_READER, InputRoutine.MOVE_TO_NFC_WRITER, OutputRoutine.MOVE_TO_NFC]:
- return {self.nfc_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
- elif self.state.phase == InputRoutine.MOVE_TO_COLOR_SENSOR:
- return {self.color_sensor_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
- elif self.state.phase == OutputRoutine.PICKUP_WP_SLD:
- return {self.sld_out[self.state.target_sld_bay_color]: {}} # output request to get workpiece
- elif self.state.phase == OutputRoutine.DROP_WP_DSO:
- return {self.dso_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
- return {self.mqtt_out: [self.get_status_message()]} # return status
- def intTransition(self):
- if self.state.visual_update_pending:
- self.state.visual_update_pending = False
- return self.state
- if self.state.immediate_message is not None:
- self.state.immediate_message = None # if requested before, we've handled it so undo
- return self.state
- if self.state.phase == InputRoutine.PLACE_WP_IN_CRATE or self.state.phase == OrderRoutine.RETURN_EMPTY_CRATE:
- self.state.crate = None # we have given our crate to the transporter
- elif self.state.phase in [OrderRoutine.DROP_WP_MPO, OutputRoutine.DROP_WP_DSO]:
- self.state.workpiece = None # we gave wp to MPO or DSO
- elif self.state.phase == InputRoutine.MOVE_TO_NFC_READER or self.state.phase == InputRoutine.MOVE_TO_NFC_WRITER or self.state.phase == InputRoutine.MOVE_TO_COLOR_SENSOR:
- self.state.workpiece = None # we've passed on our workpiece to a reader/writer, so remove it here
- elif self.state.phase.next() == OutputRoutine.MOVE_TO_NFC:
- self.change_phase(OutputRoutine.MOVE_TO_NFC)
- # Get timing depending on which place/color we got this from (-> distance changes)
- self.state.delta_t = SLD_Timings[self.state.target_sld_bay_color][OutputRoutine.MOVE_TO_NFC]
- return self.state
- elif (self.state.phase.next() == OutputRoutine.AWAIT_DSO_CLEAR) and self.state.dso_clear:
- self.change_phase(OutputRoutine.DROP_WP_DSO)
- return self.state # immediately proceed to dropping off workpiece at DSO
-
- elif self.state.mqtt_msg_queue and self.is_idle():
- # handle the next mqtt message in the queue
- msg = self.state.mqtt_msg_queue.pop(0)
- self.handle_mqtt_message(msg)
- return self.state
- self.change_phase(self.state.phase.next()) # move to the next phase
- return self.state
|