| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- from dataclasses import dataclass, field
- from enum import Enum
- from loguru import logger
- from data_models.crate import Crate
- from data_models.mqtt_message import MqttMessage
- from data_models.warehouse_inventory import WarehouseInventory
- from data_models.workpiece import WorkpieceColor
- from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
- from pypdevs.infinity import INFINITY
- from utils.get_timestamp import get_timestamp
- from utils.timed_phase_enum import TimedPhaseEnum
- class HbwPhase(TimedPhaseEnum):
- IDLE = ("IDLE", INFINITY)
- CMD_FETCH_CRATE = ("CMD_FETCH_CRATE", 0.0) # receive command to move to inventory (publish state)
- MOVING_TO_INVENTORY_FETCH = "MOVING_TO_INVENTORY_FETCH"
- # GRAB_CRATE = ("GRAB_CRATE", 10.404) # Replaced below by 3 actions: extend, fetch, retract
- EXTEND_ARM_FETCH = ("EXTEND_ARM_FETCH", 5.003)
- FETCH_CRATE = ("FETCH_CRATE", 0.563)
- RETRACT_ARM_FETCH = ("RETRACT_ARM_FETCH", 4.838)
- MOVING_TO_CT_FETCH = "MOVING_TO_CT_FETCH"
- PUSH_ONTO_CT = ("PUSH_ONTO_CT", 5.3525)
- AWAIT_CRATE = ("AWAIT_CRATE", INFINITY) # wait to get the crate back
- # TODO: check if order is correct
- CMD_STORE_CRATE = ("CMD_STORE_CRATE", 0.0) # go ahead and store (empty) crate
- GET_FROM_CT = ("GET_FROM_CT", 5.299) # pickup wp from CT belt
- MOVING_TO_INVENTORY_STORE = "MOVING_TO_INVENTORY_STORE"
- #RETURN_CRATE = ("RETURN_CRATE", 11.038) # Replaced below by 3 actions: extend, store, retract
- EXTEND_ARM_STORE = ("EXTEND_ARM_STORE", 5.469)
- STORE_CRATE = ("STORE_CRATE", 0.634)
- RETRACT_ARM_STORE = ("RETRACT_ARM_STORE", 4.935)
- MOVING_TO_REST_POS = "MOVING_TO_REST_POS"
- def is_location_specific(phase: HbwPhase) -> bool:
- """ Check if the phase is location-specific and thus has timings based on location. """
- return phase in [HbwPhase.MOVING_TO_INVENTORY_FETCH, HbwPhase.MOVING_TO_CT_FETCH, HbwPhase.MOVING_TO_INVENTORY_STORE, HbwPhase.MOVING_TO_REST_POS]
- LOCATION_TIMINGS: dict = {
- "A1": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 5.036,
- HbwPhase.MOVING_TO_CT_FETCH: 4.735,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 5.3875,
- HbwPhase.MOVING_TO_REST_POS: 4.685,
- },
- "A2": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 9.070,
- HbwPhase.MOVING_TO_CT_FETCH: 8.47,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 8.904,
- HbwPhase.MOVING_TO_REST_POS: 8.404,
- },
- "A3": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 12.805,
- HbwPhase.MOVING_TO_CT_FETCH: 11.822,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 12.689,
- HbwPhase.MOVING_TO_REST_POS: 11.888,
- },
- "B1": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 5.1195,
- HbwPhase.MOVING_TO_CT_FETCH: 4.6855,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 4.9855,
- HbwPhase.MOVING_TO_REST_POS: 4.7855,
- },
- "B2": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 8.970,
- HbwPhase.MOVING_TO_CT_FETCH: 8.3705,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 9.154,
- HbwPhase.MOVING_TO_REST_POS: 8.354,
- },
- "B3": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 12.789,
- HbwPhase.MOVING_TO_CT_FETCH: 11.806,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 12.7565,
- HbwPhase.MOVING_TO_REST_POS: 11.973,
- },
- "C1": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 5.369,
- HbwPhase.MOVING_TO_CT_FETCH: 4.602,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 4.902,
- HbwPhase.MOVING_TO_REST_POS: 6.602,
- },
- "C2": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 9.0875,
- HbwPhase.MOVING_TO_CT_FETCH: 8.4205,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 8.8705,
- HbwPhase.MOVING_TO_REST_POS: 8.504,
- },
- "C3": {
- HbwPhase.MOVING_TO_INVENTORY_FETCH: 12.890,
- HbwPhase.MOVING_TO_CT_FETCH: 11.805,
- HbwPhase.MOVING_TO_INVENTORY_STORE: 12.6225,
- HbwPhase.MOVING_TO_REST_POS: 11.955,
- }
- }
- @dataclass
- class WarehouseState:
- current_item: Crate | None = None
- mqtt_inventory: WarehouseInventory = WarehouseInventory() # Inventory that is indicative of the future (=not accurate to real state, but accurate to factory output)
- visual_inventory: WarehouseInventory = WarehouseInventory() # Inventory that is accurate to the simulation (-> visual updates)
- phase: HbwPhase = HbwPhase.IDLE
- target: WorkpieceColor | str = "" # workpiece color or 'empty' to denote target is an empty crate
- target_location: str = "" # location of the target in the inventory
- status_requested: bool = True
- delta_t: float = INFINITY
- visual_update_pending: bool = False
- mqtt_msg_queue: list[MqttMessage] = field(default_factory=list) # queue for mqtt that came in while busy
- @dataclass
- class InventoryPublisherState:
- publish_interval: float
- delta_t: float = INFINITY
- inventory: WarehouseInventory = WarehouseInventory()
- class InventoryPublisher(AtomicDEVS):
- """ Atomic DEVS which publishes inventory at a regular interval """
- def __init__(self, name: str, publish_interval: float):
- super(InventoryPublisher, self).__init__(name)
- self.inp = self.addInPort("inp") # receives the latest inventory when updated
- self.mqtt_out = self.addOutPort("mqtt_out")
- self.state = InventoryPublisherState(publish_interval=publish_interval, delta_t=publish_interval)
- self.state.delta_t = publish_interval
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.inp in inputs:
- new_inventory = inputs[self.inp][0]
- #logger.trace(f"{type(self).__name__} '{self.name}' received: {new_inventory}")
- self.state.inventory = new_inventory
- self.state.delta_t = 0.0 # publish immediately
- return self.state # important, return state
- def timeAdvance(self):
- return self.state.delta_t
- def outputFnc(self):
- # output a stock update over mqtt
- stock_msg: MqttMessage = self.state.inventory.get_stock_update_message()
- # logger.trace(f"{type(self).__name__} '{self.name}' outputs: {stock_msg}")
- return {self.mqtt_out: [stock_msg]}
- def intTransition(self):
- self.state.delta_t = self.state.publish_interval # reset timer
- return self.state
- class HighBayWarehouse(AtomicDEVS):
- """ High Bay Warehouse which holds crates containing workpieces """
- def __init__(self, name: str):
- super(HighBayWarehouse, self).__init__(name)
- self.inp = self.addInPort("inp")
- self.out = self.addOutPort("out")
- self.mqtt_in = self.addInPort("mqtt_in")
- self.mqtt_out = self.addOutPort("mqtt_out")
- self.inventory_out = self.addOutPort("inventory_out") # output for information about the inventory
- self.state: WarehouseState = WarehouseState()
- def get_status_message(self, use_next_phase=True) -> MqttMessage:
- phase = self.state.phase.next() if use_next_phase else self.state.phase
- active = 0 if (phase == HbwPhase.IDLE or phase == HbwPhase.AWAIT_CRATE) else 1
- code = 2 if active == 1 else 1
- message = MqttMessage()
- message.topic = "f/i/state/hbw"
- message.payload = {
- "active": active,
- "code": code,
- "description": '',
- "station": 'hbw',
- "ts": get_timestamp(),
- }
- return message
- def change_phase(self, new_phase: HbwPhase):
- """ Wrapper for changing the phase, helps with logging """
- self.state.phase = new_phase
- self.state.delta_t = new_phase.timing
- logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
- # Special timings for location-specific phases
- if is_location_specific(new_phase):
- self.state.delta_t = LOCATION_TIMINGS[self.state.target_location][new_phase]
- # update visuals
- self.state.visual_update_pending = True
- def get_visual_update_data(self) -> MqttMessage:
- """ Get visual update data for the animation, contains the action taken and the duration of that action left """
- message = MqttMessage()
- message.topic = "visualization/hbw"
- duration = self.state.delta_t
- if duration == INFINITY: duration = None
- target = self.state.target
- if isinstance(target, WorkpieceColor): target = target.value
- message.payload = {
- "action": self.state.phase.value,
- "duration": duration, # should be equal to the timing of the phase
- "crate": self.state.current_item.to_dict() if self.state.current_item else None, # crate, if present
- "target": target,
- "target_location": self.state.target_location,
- }
- return message
-
- def handle_mqtt_message(self, mqtt_msg: MqttMessage) -> None:
- """ Handle incoming MQTT messages, used for commands and status requests """
- if mqtt_msg.topic == "simulation/ctrl/all" and mqtt_msg.payload.get("action") == "refresh":
- self.state.status_requested = True
- self.state.visual_update_pending = True
- elif self.state.phase != HbwPhase.IDLE and mqtt_msg.topic == "fl/vgr/do":
- self.state.mqtt_msg_queue.append(mqtt_msg) # queue the message for later processing
- elif mqtt_msg.topic == "fl/vgr/do":
- if mqtt_msg.payload['code'] == 1: # FETCH EMPTY
- self.change_phase(HbwPhase.CMD_FETCH_CRATE)
- self.state.target = "empty"
- self.state.target_location = self.state.mqtt_inventory.get_loc_empty_crate()
- elif mqtt_msg.payload['code'] == 3: # FETCH WP
- self.change_phase(HbwPhase.CMD_FETCH_CRATE)
- self.state.target = WorkpieceColor(mqtt_msg.payload['workpiece']['type'])
- self.state.target_location = self.state.mqtt_inventory.get_loc_workpiece_crate(color=self.state.target)
- def extTransition(self, inputs):
- self.state.delta_t -= self.elapsed
- if self.inp in inputs:
- new_crate = inputs[self.inp][0]
- self.state.current_item = new_crate
- if self.state.phase != HbwPhase.AWAIT_CRATE:
- logger.error(f"{type(self).__name__} '{self.name}' received: {new_crate} WHILE NOT AWAITING CRATE")
- else:
- self.change_phase(HbwPhase.CMD_STORE_CRATE)
- logger.trace(f"{type(self).__name__} '{self.name}' received: {new_crate}")
- self.state.target_location = self.state.mqtt_inventory.get_loc_free_slot()
- # if we have a new workpiece, immediately store it
- self._store_crate(self.state.current_item, self.state.mqtt_inventory) # TODO: check if matches with mqtt
- elif self.mqtt_in in inputs:
- mqtt_msg: MqttMessage = inputs[self.mqtt_in][0]
- self.handle_mqtt_message(mqtt_msg)
- return self.state
-
- return self.state # important, return state
- def timeAdvance(self):
- if self.state.visual_update_pending or self.state.status_requested:
- return 0.0 # immediate reply
- if self.state.phase == HbwPhase.IDLE and self.state.mqtt_msg_queue:
- return 0.0 # immediately handle queued MQTT messages
- return self.state.delta_t
- def outputFnc(self):
- if self.state.visual_update_pending:
- return {self.mqtt_out: [self.get_visual_update_data(), self.state.visual_inventory.get_visual_update_data()]}
- if self.state.status_requested:
- return {self.mqtt_out: [self.get_status_message(use_next_phase=False)], self.inventory_out: [self.state.mqtt_inventory]}
- if self.state.phase == HbwPhase.PUSH_ONTO_CT:
- return {self.out: [self.state.current_item], self.mqtt_out: [self.get_status_message()]}
- # else: update the inventory data and status
- return {self.inventory_out: [self.state.mqtt_inventory], self.mqtt_out: [self.get_status_message()]}
- def _get_crate(self, target: WorkpieceColor | str, inventory: WarehouseInventory) -> Crate | None:
- """Get crate, depending on target"""
- if target == "":
- logger.error(f"{type(self).__name__} '{self.name}' attempting to retrieve crate: {target}")
- return None
- elif target == "empty":
- empty_loc = inventory.get_loc_empty_crate()
- return inventory.take(empty_loc)
- else:
- wp_loc = inventory.get_loc_workpiece_crate(color=target)
- return inventory.take(wp_loc)
- def _store_crate(self, crate: Crate, inventory: WarehouseInventory) -> None:
- inventory.insert(crate)
- def intTransition(self):
- if self.state.visual_update_pending:
- self.state.visual_update_pending = False
- return self.state
- # if there's a status request, do not change state
- if self.state.status_requested:
- self.state.status_requested = False
- return self.state
- if self.state.phase == HbwPhase.FETCH_CRATE:
- # Get the appropriate crate
- self.state.current_item = self._get_crate(target=self.state.target, inventory=self.state.mqtt_inventory)
- # Also update the visual inventory
- self._get_crate(self.state.target, self.state.visual_inventory)
-
- elif self.state.phase == HbwPhase.PUSH_ONTO_CT:
- # if we just output a crate, remove it
- self.state.current_item = None
- elif self.state.phase == HbwPhase.STORE_CRATE:
- self._store_crate(crate=self.state.current_item, inventory=self.state.visual_inventory) # Store crate visually
- self.state.current_item = None # remove stored crate
- elif self.state.phase == HbwPhase.MOVING_TO_REST_POS:
- self.state.target = "" # reset target
- self.state.target_location = ""
- if self.state.phase == HbwPhase.IDLE and self.state.mqtt_msg_queue:
- # If we are IDLE and have queued MQTT messages, handle them immediately
- mqtt_msg = self.state.mqtt_msg_queue.pop(0)
- self.handle_mqtt_message(mqtt_msg)
- return self.state
-
- # move to next phase
- self.change_phase(self.state.phase.next())
- return self.state
|