high_bay_warehouse.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. from dataclasses import dataclass, field
  2. from enum import Enum
  3. from loguru import logger
  4. from data_models.crate import Crate
  5. from data_models.mqtt_message import MqttMessage
  6. from data_models.warehouse_inventory import WarehouseInventory
  7. from data_models.workpiece import WorkpieceColor
  8. from pypdevs.DEVS import AtomicDEVS, CoupledDEVS
  9. from pypdevs.infinity import INFINITY
  10. from utils.get_timestamp import get_timestamp
  11. from utils.timed_phase_enum import TimedPhaseEnum
  12. class HbwPhase(TimedPhaseEnum):
  13. IDLE = ("IDLE", INFINITY)
  14. CMD_FETCH_CRATE = ("CMD_FETCH_CRATE", 0.0) # receive command to move to inventory (publish state)
  15. MOVING_TO_INVENTORY_FETCH = "MOVING_TO_INVENTORY_FETCH"
  16. # GRAB_CRATE = ("GRAB_CRATE", 10.404) # Replaced below by 3 actions: extend, fetch, retract
  17. EXTEND_ARM_FETCH = ("EXTEND_ARM_FETCH", 5.003)
  18. FETCH_CRATE = ("FETCH_CRATE", 0.563)
  19. RETRACT_ARM_FETCH = ("RETRACT_ARM_FETCH", 4.838)
  20. MOVING_TO_CT_FETCH = "MOVING_TO_CT_FETCH"
  21. PUSH_ONTO_CT = ("PUSH_ONTO_CT", 5.3525)
  22. AWAIT_CRATE = ("AWAIT_CRATE", INFINITY) # wait to get the crate back
  23. # TODO: check if order is correct
  24. CMD_STORE_CRATE = ("CMD_STORE_CRATE", 0.0) # go ahead and store (empty) crate
  25. GET_FROM_CT = ("GET_FROM_CT", 5.299) # pickup wp from CT belt
  26. MOVING_TO_INVENTORY_STORE = "MOVING_TO_INVENTORY_STORE"
  27. #RETURN_CRATE = ("RETURN_CRATE", 11.038) # Replaced below by 3 actions: extend, store, retract
  28. EXTEND_ARM_STORE = ("EXTEND_ARM_STORE", 5.469)
  29. STORE_CRATE = ("STORE_CRATE", 0.634)
  30. RETRACT_ARM_STORE = ("RETRACT_ARM_STORE", 4.935)
  31. MOVING_TO_REST_POS = "MOVING_TO_REST_POS"
  32. def is_location_specific(phase: HbwPhase) -> bool:
  33. """ Check if the phase is location-specific and thus has timings based on location. """
  34. return phase in [HbwPhase.MOVING_TO_INVENTORY_FETCH, HbwPhase.MOVING_TO_CT_FETCH, HbwPhase.MOVING_TO_INVENTORY_STORE, HbwPhase.MOVING_TO_REST_POS]
  35. LOCATION_TIMINGS: dict = {
  36. "A1": {
  37. HbwPhase.MOVING_TO_INVENTORY_FETCH: 5.036,
  38. HbwPhase.MOVING_TO_CT_FETCH: 4.735,
  39. HbwPhase.MOVING_TO_INVENTORY_STORE: 5.3875,
  40. HbwPhase.MOVING_TO_REST_POS: 4.685,
  41. },
  42. "A2": {
  43. HbwPhase.MOVING_TO_INVENTORY_FETCH: 9.070,
  44. HbwPhase.MOVING_TO_CT_FETCH: 8.47,
  45. HbwPhase.MOVING_TO_INVENTORY_STORE: 8.904,
  46. HbwPhase.MOVING_TO_REST_POS: 8.404,
  47. },
  48. "A3": {
  49. HbwPhase.MOVING_TO_INVENTORY_FETCH: 12.805,
  50. HbwPhase.MOVING_TO_CT_FETCH: 11.822,
  51. HbwPhase.MOVING_TO_INVENTORY_STORE: 12.689,
  52. HbwPhase.MOVING_TO_REST_POS: 11.888,
  53. },
  54. "B1": {
  55. HbwPhase.MOVING_TO_INVENTORY_FETCH: 5.1195,
  56. HbwPhase.MOVING_TO_CT_FETCH: 4.6855,
  57. HbwPhase.MOVING_TO_INVENTORY_STORE: 4.9855,
  58. HbwPhase.MOVING_TO_REST_POS: 4.7855,
  59. },
  60. "B2": {
  61. HbwPhase.MOVING_TO_INVENTORY_FETCH: 8.970,
  62. HbwPhase.MOVING_TO_CT_FETCH: 8.3705,
  63. HbwPhase.MOVING_TO_INVENTORY_STORE: 9.154,
  64. HbwPhase.MOVING_TO_REST_POS: 8.354,
  65. },
  66. "B3": {
  67. HbwPhase.MOVING_TO_INVENTORY_FETCH: 12.789,
  68. HbwPhase.MOVING_TO_CT_FETCH: 11.806,
  69. HbwPhase.MOVING_TO_INVENTORY_STORE: 12.7565,
  70. HbwPhase.MOVING_TO_REST_POS: 11.973,
  71. },
  72. "C1": {
  73. HbwPhase.MOVING_TO_INVENTORY_FETCH: 5.369,
  74. HbwPhase.MOVING_TO_CT_FETCH: 4.602,
  75. HbwPhase.MOVING_TO_INVENTORY_STORE: 4.902,
  76. HbwPhase.MOVING_TO_REST_POS: 6.602,
  77. },
  78. "C2": {
  79. HbwPhase.MOVING_TO_INVENTORY_FETCH: 9.0875,
  80. HbwPhase.MOVING_TO_CT_FETCH: 8.4205,
  81. HbwPhase.MOVING_TO_INVENTORY_STORE: 8.8705,
  82. HbwPhase.MOVING_TO_REST_POS: 8.504,
  83. },
  84. "C3": {
  85. HbwPhase.MOVING_TO_INVENTORY_FETCH: 12.890,
  86. HbwPhase.MOVING_TO_CT_FETCH: 11.805,
  87. HbwPhase.MOVING_TO_INVENTORY_STORE: 12.6225,
  88. HbwPhase.MOVING_TO_REST_POS: 11.955,
  89. }
  90. }
  91. @dataclass
  92. class WarehouseState:
  93. current_item: Crate | None = None
  94. mqtt_inventory: WarehouseInventory = WarehouseInventory() # Inventory that is indicative of the future (=not accurate to real state, but accurate to factory output)
  95. visual_inventory: WarehouseInventory = WarehouseInventory() # Inventory that is accurate to the simulation (-> visual updates)
  96. phase: HbwPhase = HbwPhase.IDLE
  97. target: WorkpieceColor | str = "" # workpiece color or 'empty' to denote target is an empty crate
  98. target_location: str = "" # location of the target in the inventory
  99. status_requested: bool = True
  100. delta_t: float = INFINITY
  101. visual_update_pending: bool = False
  102. mqtt_msg_queue: list[MqttMessage] = field(default_factory=list) # queue for mqtt that came in while busy
  103. @dataclass
  104. class InventoryPublisherState:
  105. publish_interval: float
  106. delta_t: float = INFINITY
  107. inventory: WarehouseInventory = WarehouseInventory()
  108. class InventoryPublisher(AtomicDEVS):
  109. """ Atomic DEVS which publishes inventory at a regular interval """
  110. def __init__(self, name: str, publish_interval: float):
  111. super(InventoryPublisher, self).__init__(name)
  112. self.inp = self.addInPort("inp") # receives the latest inventory when updated
  113. self.mqtt_out = self.addOutPort("mqtt_out")
  114. self.state = InventoryPublisherState(publish_interval=publish_interval, delta_t=publish_interval)
  115. self.state.delta_t = publish_interval
  116. def extTransition(self, inputs):
  117. self.state.delta_t -= self.elapsed
  118. if self.inp in inputs:
  119. new_inventory = inputs[self.inp][0]
  120. #logger.trace(f"{type(self).__name__} '{self.name}' received: {new_inventory}")
  121. self.state.inventory = new_inventory
  122. self.state.delta_t = 0.0 # publish immediately
  123. return self.state # important, return state
  124. def timeAdvance(self):
  125. return self.state.delta_t
  126. def outputFnc(self):
  127. # output a stock update over mqtt
  128. stock_msg: MqttMessage = self.state.inventory.get_stock_update_message()
  129. # logger.trace(f"{type(self).__name__} '{self.name}' outputs: {stock_msg}")
  130. return {self.mqtt_out: [stock_msg]}
  131. def intTransition(self):
  132. self.state.delta_t = self.state.publish_interval # reset timer
  133. return self.state
  134. class HighBayWarehouse(AtomicDEVS):
  135. """ High Bay Warehouse which holds crates containing workpieces """
  136. def __init__(self, name: str):
  137. super(HighBayWarehouse, self).__init__(name)
  138. self.inp = self.addInPort("inp")
  139. self.out = self.addOutPort("out")
  140. self.mqtt_in = self.addInPort("mqtt_in")
  141. self.mqtt_out = self.addOutPort("mqtt_out")
  142. self.inventory_out = self.addOutPort("inventory_out") # output for information about the inventory
  143. self.state: WarehouseState = WarehouseState()
  144. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  145. phase = self.state.phase.next() if use_next_phase else self.state.phase
  146. active = 0 if (phase == HbwPhase.IDLE or phase == HbwPhase.AWAIT_CRATE) else 1
  147. code = 2 if active == 1 else 1
  148. message = MqttMessage()
  149. message.topic = "f/i/state/hbw"
  150. message.payload = {
  151. "active": active,
  152. "code": code,
  153. "description": '',
  154. "station": 'hbw',
  155. "ts": get_timestamp(),
  156. }
  157. return message
  158. def change_phase(self, new_phase: HbwPhase):
  159. """ Wrapper for changing the phase, helps with logging """
  160. self.state.phase = new_phase
  161. self.state.delta_t = new_phase.timing
  162. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  163. # Special timings for location-specific phases
  164. if is_location_specific(new_phase):
  165. self.state.delta_t = LOCATION_TIMINGS[self.state.target_location][new_phase]
  166. # update visuals
  167. self.state.visual_update_pending = True
  168. def get_visual_update_data(self) -> MqttMessage:
  169. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  170. message = MqttMessage()
  171. message.topic = "visualization/hbw"
  172. duration = self.state.delta_t
  173. if duration == INFINITY: duration = None
  174. target = self.state.target
  175. if isinstance(target, WorkpieceColor): target = target.value
  176. message.payload = {
  177. "action": self.state.phase.value,
  178. "duration": duration, # should be equal to the timing of the phase
  179. "crate": self.state.current_item.to_dict() if self.state.current_item else None, # crate, if present
  180. "target": target,
  181. "target_location": self.state.target_location,
  182. }
  183. return message
  184. def handle_mqtt_message(self, mqtt_msg: MqttMessage) -> None:
  185. """ Handle incoming MQTT messages, used for commands and status requests """
  186. if mqtt_msg.topic == "simulation/ctrl/all" and mqtt_msg.payload.get("action") == "refresh":
  187. self.state.status_requested = True
  188. self.state.visual_update_pending = True
  189. elif self.state.phase != HbwPhase.IDLE and mqtt_msg.topic == "fl/vgr/do":
  190. self.state.mqtt_msg_queue.append(mqtt_msg) # queue the message for later processing
  191. elif mqtt_msg.topic == "fl/vgr/do":
  192. if mqtt_msg.payload['code'] == 1: # FETCH EMPTY
  193. self.change_phase(HbwPhase.CMD_FETCH_CRATE)
  194. self.state.target = "empty"
  195. self.state.target_location = self.state.mqtt_inventory.get_loc_empty_crate()
  196. elif mqtt_msg.payload['code'] == 3: # FETCH WP
  197. self.change_phase(HbwPhase.CMD_FETCH_CRATE)
  198. self.state.target = WorkpieceColor(mqtt_msg.payload['workpiece']['type'])
  199. self.state.target_location = self.state.mqtt_inventory.get_loc_workpiece_crate(color=self.state.target)
  200. def extTransition(self, inputs):
  201. self.state.delta_t -= self.elapsed
  202. if self.inp in inputs:
  203. new_crate = inputs[self.inp][0]
  204. self.state.current_item = new_crate
  205. if self.state.phase != HbwPhase.AWAIT_CRATE:
  206. logger.error(f"{type(self).__name__} '{self.name}' received: {new_crate} WHILE NOT AWAITING CRATE")
  207. else:
  208. self.change_phase(HbwPhase.CMD_STORE_CRATE)
  209. logger.trace(f"{type(self).__name__} '{self.name}' received: {new_crate}")
  210. self.state.target_location = self.state.mqtt_inventory.get_loc_free_slot()
  211. # if we have a new workpiece, immediately store it
  212. self._store_crate(self.state.current_item, self.state.mqtt_inventory) # TODO: check if matches with mqtt
  213. elif self.mqtt_in in inputs:
  214. mqtt_msg: MqttMessage = inputs[self.mqtt_in][0]
  215. self.handle_mqtt_message(mqtt_msg)
  216. return self.state
  217. return self.state # important, return state
  218. def timeAdvance(self):
  219. if self.state.visual_update_pending or self.state.status_requested:
  220. return 0.0 # immediate reply
  221. if self.state.phase == HbwPhase.IDLE and self.state.mqtt_msg_queue:
  222. return 0.0 # immediately handle queued MQTT messages
  223. return self.state.delta_t
  224. def outputFnc(self):
  225. if self.state.visual_update_pending:
  226. return {self.mqtt_out: [self.get_visual_update_data(), self.state.visual_inventory.get_visual_update_data()]}
  227. if self.state.status_requested:
  228. return {self.mqtt_out: [self.get_status_message(use_next_phase=False)], self.inventory_out: [self.state.mqtt_inventory]}
  229. if self.state.phase == HbwPhase.PUSH_ONTO_CT:
  230. return {self.out: [self.state.current_item], self.mqtt_out: [self.get_status_message()]}
  231. # else: update the inventory data and status
  232. return {self.inventory_out: [self.state.mqtt_inventory], self.mqtt_out: [self.get_status_message()]}
  233. def _get_crate(self, target: WorkpieceColor | str, inventory: WarehouseInventory) -> Crate | None:
  234. """Get crate, depending on target"""
  235. if target == "":
  236. logger.error(f"{type(self).__name__} '{self.name}' attempting to retrieve crate: {target}")
  237. return None
  238. elif target == "empty":
  239. empty_loc = inventory.get_loc_empty_crate()
  240. return inventory.take(empty_loc)
  241. else:
  242. wp_loc = inventory.get_loc_workpiece_crate(color=target)
  243. return inventory.take(wp_loc)
  244. def _store_crate(self, crate: Crate, inventory: WarehouseInventory) -> None:
  245. inventory.insert(crate)
  246. def intTransition(self):
  247. if self.state.visual_update_pending:
  248. self.state.visual_update_pending = False
  249. return self.state
  250. # if there's a status request, do not change state
  251. if self.state.status_requested:
  252. self.state.status_requested = False
  253. return self.state
  254. if self.state.phase == HbwPhase.FETCH_CRATE:
  255. # Get the appropriate crate
  256. self.state.current_item = self._get_crate(target=self.state.target, inventory=self.state.mqtt_inventory)
  257. # Also update the visual inventory
  258. self._get_crate(self.state.target, self.state.visual_inventory)
  259. elif self.state.phase == HbwPhase.PUSH_ONTO_CT:
  260. # if we just output a crate, remove it
  261. self.state.current_item = None
  262. elif self.state.phase == HbwPhase.STORE_CRATE:
  263. self._store_crate(crate=self.state.current_item, inventory=self.state.visual_inventory) # Store crate visually
  264. self.state.current_item = None # remove stored crate
  265. elif self.state.phase == HbwPhase.MOVING_TO_REST_POS:
  266. self.state.target = "" # reset target
  267. self.state.target_location = ""
  268. if self.state.phase == HbwPhase.IDLE and self.state.mqtt_msg_queue:
  269. # If we are IDLE and have queued MQTT messages, handle them immediately
  270. mqtt_msg = self.state.mqtt_msg_queue.pop(0)
  271. self.handle_mqtt_message(mqtt_msg)
  272. return self.state
  273. # move to next phase
  274. self.change_phase(self.state.phase.next())
  275. return self.state