warehouse_inventory.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. from loguru import logger
  2. from data_models.crate import Crate
  3. from data_models.mqtt_message import MqttMessage
  4. from data_models.workpiece import WorkpieceColor
  5. from utils.get_timestamp import get_timestamp
  6. class WarehouseInventory:
  7. """ Class representing a warehouse inventory of 3x3 containing crates with workpieces
  8. A1 A2 A3
  9. B1 B2 B3
  10. C1 C2 C3
  11. """
  12. def __init__(self):
  13. locations = ["A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "C3"]
  14. # self.inventory: dict[str, Crate | None] = dict.fromkeys(locations, Crate(None))
  15. self.inventory: dict[str, Crate | None] = {loc: Crate(None) for loc in locations}
  16. self.search_order = ["C1", "B1", "A1", "C2", "B2", "A2", "C3", "B3", "A3"] # the order in which a slot is picked
  17. def __str__(self):
  18. output = "Warehouse Inventory("
  19. for i, loc in enumerate(self.inventory):
  20. crate_info = "_"
  21. if self.inventory[loc]:
  22. crate_info = "[]"
  23. if self.inventory[loc].workpiece:
  24. crate_info = f"[{self.inventory[loc].workpiece.color.value}]"
  25. output += f"{loc}: {crate_info}, "
  26. output += ")"
  27. return output
  28. def __repr__(self):
  29. inventory_repr = ", ".join(f"'{loc}': {repr(self.inventory[loc])}" for loc in self.inventory)
  30. return f"WarehouseInventory(inventory={{{inventory_repr}}})"
  31. def get(self, location: str) -> Crate:
  32. """ Gets a crate from a specific location """
  33. return self.inventory.get(location)
  34. def take(self, location: str) -> Crate:
  35. """ Takes a crate from a specific location (so removes it from the inventory """
  36. crate = self.get(location)
  37. self.inventory[location] = None
  38. return crate
  39. def get_stock_update_message(self) -> MqttMessage:
  40. message = MqttMessage()
  41. message.topic = "f/i/stock"
  42. message.payload['ts'] = get_timestamp()
  43. message.payload['stockItems'] = [
  44. {
  45. "location": location,
  46. "workpiece": {
  47. "type": crate.workpiece.color.value,
  48. "state": crate.workpiece.state,
  49. "id": str(crate.workpiece.id),
  50. } if (crate and crate.workpiece) else None
  51. }
  52. for location, crate in self.inventory.items()
  53. ]
  54. return message
  55. def get_visual_update_data(self) -> MqttMessage:
  56. message = self.get_stock_update_message()
  57. message.topic = "visualization/stock"
  58. return message
  59. def get_loc_free_slot(self) -> str | None:
  60. """ Get the location of the next free slot in the inventory, None if full"""
  61. # Insertion order preserved in dictionaries since Python 3.7+
  62. for location in self.search_order:
  63. if self.get(location) is None:
  64. return location
  65. return None
  66. def is_full(self) -> bool:
  67. """ Whether the inventory is full of workpieces """
  68. return all(crate and crate.workpiece for crate in self.inventory.values())
  69. def insert(self, crate: Crate) -> str:
  70. """ Inserts a crate into the inventory and returns its location, logs an error if the operation is not allowed """
  71. if self.is_full():
  72. logger.error(f"{type(self).__name__} INVENTORY FULL, dropping {crate}")
  73. return ""
  74. else:
  75. target_loc = self.get_loc_free_slot()
  76. self.inventory[target_loc] = crate
  77. return target_loc
  78. def get_loc_workpiece_crate(self, color: WorkpieceColor) -> str | None:
  79. """ Get the location of the next crate with <color> workpiece, None if not available """
  80. for location in self.search_order:
  81. item = self.get(location)
  82. if item and item.workpiece and item.workpiece.color == color:
  83. return location
  84. logger.error(f"{type(self).__name__} No workpiece of color '{color}' found in inventory.")
  85. return None
  86. def get_loc_empty_crate(self) -> str | None:
  87. """ Get the location of the next empty crate, None if all crates full """
  88. for location in self.search_order:
  89. item = self.get(location)
  90. if item and item.workpiece is None:
  91. return location
  92. logger.error(f"{type(self).__name__} INVENTORY FULL, no empty space found.")
  93. return None