from loguru import logger from data_models.crate import Crate from data_models.mqtt_message import MqttMessage from data_models.workpiece import WorkpieceColor from utils.get_timestamp import get_timestamp class WarehouseInventory: """ Class representing a warehouse inventory of 3x3 containing crates with workpieces A1 A2 A3 B1 B2 B3 C1 C2 C3 """ def __init__(self): locations = ["A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "C3"] self.inventory: dict[str, Crate | None] = dict.fromkeys(locations, Crate(None)) self.search_order = ["C1", "B1", "A1", "C2", "B2", "A2", "C3", "B3", "A3"] # the order in which a slot is picked def __str__(self): output = "Warehouse Inventory(" for i, loc in enumerate(self.inventory): crate_info = "_" if self.inventory[loc]: crate_info = "[]" if self.inventory[loc].workpiece: crate_info = f"[{self.inventory[loc].workpiece.color.value}]" output += f"{loc}: {crate_info}, " output += ")" return output def __repr__(self): inventory_repr = ", ".join(f"'{loc}': {repr(self.inventory[loc])}" for loc in self.inventory) return f"WarehouseInventory(inventory={{{inventory_repr}}})" def get(self, location: str) -> Crate: """ Gets a crate from a specific location """ return self.inventory.get(location) def take(self, location: str) -> Crate: """ Takes a crate from a specific location (so removes it from the inventory """ crate = self.get(location) self.inventory[location] = None return crate def get_stock_update_message(self) -> MqttMessage: message = MqttMessage() message.topic = "f/i/stock" message.payload['ts'] = get_timestamp() message.payload['stockItems'] = [ { "location": location, "workpiece": { "type": crate.workpiece.color.value, "state": crate.workpiece.state, "id": str(crate.workpiece.id), } if (crate and crate.workpiece) else None } for location, crate in self.inventory.items() ] return message def get_visual_update_data(self) -> MqttMessage: message = self.get_stock_update_message() message.topic = "visualization/stock" return message def get_loc_free_slot(self) -> str | None: """ Get the location of the next free slot in the inventory, None if full""" # Insertion order preserved in dictionaries since Python 3.7+ for location in self.search_order: if self.get(location) is None: return location return None def is_full(self) -> bool: """ Whether the inventory is full of workpieces """ return all(crate and crate.workpiece for crate in self.inventory.values()) def insert(self, crate: Crate) -> str: """ Inserts a crate into the inventory and returns its location, logs an error if the operation is not allowed """ if self.is_full(): logger.error(f"{type(self).__name__} INVENTORY FULL, dropping {crate}") return "" else: target_loc = self.get_loc_free_slot() self.inventory[target_loc] = crate return target_loc def get_loc_workpiece_crate(self, color: WorkpieceColor) -> str | None: """ Get the location of the next crate with workpiece, None if not available """ for location in self.search_order: item = self.get(location) if item and item.workpiece and item.workpiece.color == color: return location logger.error(f"{type(self).__name__} No workpiece of color '{color}' found in inventory.") return None def get_loc_empty_crate(self) -> str | None: """ Get the location of the next empty crate, None if all crates full """ for location in self.search_order: item = self.get(location) if item and item.workpiece is None: return location logger.error(f"{type(self).__name__} INVENTORY FULL, no empty space found.") return None