123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- from loguru import logger
- from data_models.crate import Crate
- from data_models.mqtt_message import MqttMessage
- from data_models.workpiece import WorkpieceColor
- from utils.get_timestamp import get_timestamp
- class WarehouseInventory:
- """ Class representing a warehouse inventory of 3x3 containing crates with workpieces
- A1 A2 A3
- B1 B2 B3
- C1 C2 C3
- """
- def __init__(self):
- locations = ["A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "C3"]
- self.inventory: dict[str, Crate | None] = dict.fromkeys(locations, Crate(None))
- self.search_order = ["C1", "B1", "A1", "C2", "B2", "A2", "C3", "B3", "A3"] # the order in which a slot is picked
- def __str__(self):
- output = "Warehouse Inventory("
- for i, loc in enumerate(self.inventory):
- crate_info = "_"
- if self.inventory[loc]:
- crate_info = "[]"
- if self.inventory[loc].workpiece:
- crate_info = f"[{self.inventory[loc].workpiece.color.value}]"
- output += f"{loc}: {crate_info}, "
- output += ")"
- return output
- def __repr__(self):
- inventory_repr = ", ".join(f"'{loc}': {repr(self.inventory[loc])}" for loc in self.inventory)
- return f"WarehouseInventory(inventory={{{inventory_repr}}})"
- def get(self, location: str) -> Crate:
- """ Gets a crate from a specific location """
- return self.inventory.get(location)
- def take(self, location: str) -> Crate:
- """ Takes a crate from a specific location (so removes it from the inventory """
- crate = self.get(location)
- self.inventory[location] = None
- return crate
- def get_stock_update_message(self) -> MqttMessage:
- message = MqttMessage()
- message.topic = "f/i/stock"
- message.payload['ts'] = get_timestamp()
- message.payload['stockItems'] = [
- {
- "location": location,
- "workpiece": {
- "type": crate.workpiece.color.value,
- "state": crate.workpiece.state,
- "id": str(crate.workpiece.id),
- } if (crate and crate.workpiece) else None
- }
- for location, crate in self.inventory.items()
- ]
- return message
-
- def get_visual_update_data(self) -> MqttMessage:
- message = self.get_stock_update_message()
- message.topic = "visualization/stock"
- return message
- def get_loc_free_slot(self) -> str | None:
- """ Get the location of the next free slot in the inventory, None if full"""
- # Insertion order preserved in dictionaries since Python 3.7+
- for location in self.search_order:
- if self.get(location) is None:
- return location
- return None
- def is_full(self) -> bool:
- """ Whether the inventory is full of workpieces """
- return all(crate and crate.workpiece for crate in self.inventory.values())
- def insert(self, crate: Crate) -> str:
- """ Inserts a crate into the inventory and returns its location, logs an error if the operation is not allowed """
- if self.is_full():
- logger.error(f"{type(self).__name__} INVENTORY FULL, dropping {crate}")
- return ""
- else:
- target_loc = self.get_loc_free_slot()
- self.inventory[target_loc] = crate
- return target_loc
- def get_loc_workpiece_crate(self, color: WorkpieceColor) -> str | None:
- """ Get the location of the next crate with <color> workpiece, None if not available """
- for location in self.search_order:
- item = self.get(location)
- if item and item.workpiece and item.workpiece.color == color:
- return location
- logger.error(f"{type(self).__name__} No workpiece of color '{color}' found in inventory.")
- return None
- def get_loc_empty_crate(self) -> str | None:
- """ Get the location of the next empty crate, None if all crates full """
- for location in self.search_order:
- item = self.get(location)
- if item and item.workpiece is None:
- return location
- logger.error(f"{type(self).__name__} INVENTORY FULL, no empty space found.")
- return None
|