warehouse_inventory.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. from loguru import logger
  2. from data_models.crate import Crate
  3. from data_models.mqtt_message import MqttMessage
  4. from data_models.workpiece import WorkpieceColor
  5. from utils.get_timestamp import get_timestamp
  6. class WarehouseInventory:
  7. """ Class representing a warehouse inventory of 3x3 containing crates with workpieces
  8. A1 A2 A3
  9. B1 B2 B3
  10. C1 C2 C3
  11. """
  12. def __init__(self):
  13. locations = ["A1", "A2", "A3", "B1", "B2", "B3", "C1", "C2", "C3"]
  14. self.inventory: dict[str, Crate | None] = dict.fromkeys(locations, Crate(None))
  15. self.search_order = ["C1", "B1", "A1", "C2", "B2", "A2", "C3", "B3", "A3"] # the order in which a slot is picked
  16. def __str__(self):
  17. output = "Warehouse Inventory("
  18. for i, loc in enumerate(self.inventory):
  19. crate_info = "_"
  20. if self.inventory[loc]:
  21. crate_info = "[]"
  22. if self.inventory[loc].workpiece:
  23. crate_info = f"[{self.inventory[loc].workpiece.color.value}]"
  24. output += f"{loc}: {crate_info}, "
  25. output += ")"
  26. return output
  27. def __repr__(self):
  28. inventory_repr = ", ".join(f"'{loc}': {repr(self.inventory[loc])}" for loc in self.inventory)
  29. return f"WarehouseInventory(inventory={{{inventory_repr}}})"
  30. def get(self, location: str) -> Crate:
  31. """ Gets a crate from a specific location """
  32. return self.inventory.get(location)
  33. def take(self, location: str) -> Crate:
  34. """ Takes a crate from a specific location (so removes it from the inventory """
  35. crate = self.get(location)
  36. self.inventory[location] = None
  37. return crate
  38. def get_stock_update_message(self) -> MqttMessage:
  39. message = MqttMessage()
  40. message.topic = "f/i/stock"
  41. message.payload['ts'] = get_timestamp()
  42. message.payload['stockItems'] = [
  43. {
  44. "location": location,
  45. "workpiece": {
  46. "type": crate.workpiece.color.value,
  47. "state": crate.workpiece.state,
  48. "id": str(crate.workpiece.id),
  49. } if (crate and crate.workpiece) else None
  50. }
  51. for location, crate in self.inventory.items()
  52. ]
  53. return message
  54. def get_visual_update_data(self) -> MqttMessage:
  55. message = self.get_stock_update_message()
  56. message.topic = "visualization/stock"
  57. return message
  58. def get_loc_free_slot(self) -> str | None:
  59. """ Get the location of the next free slot in the inventory, None if full"""
  60. # Insertion order preserved in dictionaries since Python 3.7+
  61. for location in self.search_order:
  62. if self.get(location) is None:
  63. return location
  64. return None
  65. def is_full(self) -> bool:
  66. """ Whether the inventory is full of workpieces """
  67. return all(crate and crate.workpiece for crate in self.inventory.values())
  68. def insert(self, crate: Crate) -> str:
  69. """ Inserts a crate into the inventory and returns its location, logs an error if the operation is not allowed """
  70. if self.is_full():
  71. logger.error(f"{type(self).__name__} INVENTORY FULL, dropping {crate}")
  72. return ""
  73. else:
  74. target_loc = self.get_loc_free_slot()
  75. self.inventory[target_loc] = crate
  76. return target_loc
  77. def get_loc_workpiece_crate(self, color: WorkpieceColor) -> str | None:
  78. """ Get the location of the next crate with <color> workpiece, None if not available """
  79. for location in self.search_order:
  80. item = self.get(location)
  81. if item and item.workpiece and item.workpiece.color == color:
  82. return location
  83. logger.error(f"{type(self).__name__} No workpiece of color '{color}' found in inventory.")
  84. return None
  85. def get_loc_empty_crate(self) -> str | None:
  86. """ Get the location of the next empty crate, None if all crates full """
  87. for location in self.search_order:
  88. item = self.get(location)
  89. if item and item.workpiece is None:
  90. return location
  91. logger.error(f"{type(self).__name__} INVENTORY FULL, no empty space found.")
  92. return None