vacuum_gripper.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. from dataclasses import dataclass, field
  2. from loguru import logger
  3. from data_models.crate import Crate
  4. from data_models.mqtt_message import MqttMessage
  5. from data_models.workpiece import Workpiece, WorkpieceColor
  6. from pypdevs.DEVS import AtomicDEVS
  7. from pypdevs.infinity import INFINITY
  8. from utils.get_timestamp import get_timestamp
  9. from utils.timed_phase_enum import TimedPhaseEnum
  10. class InputRoutine(TimedPhaseEnum):
  11. """ Routine when an input item is detected: Steps in order, along with their timings """
  12. IDLE = ('IDLE', INFINITY)
  13. # Move to DSI and retrieve input workpiece
  14. MOVE_TO_DSI = ('MOVE_TO_DSI', 7.0525)
  15. AWAIT_WORKPIECE = ('AWAIT_WORKPIECE', INFINITY) # wait for dsi to hand over workpiece
  16. MOVE_WP_UP_FROM_DSI = ('MOVE_WP_UP_FROM_DSI', 1.434) # move wp up from DSI
  17. # NFC reader
  18. MOVE_TO_NFC_READER = ('MOVE_TO_NFC_READER', 2.001)
  19. AWAIT_NFC_READ = ('AWAIT_NFC_READ', INFINITY)
  20. # Color Reader
  21. MOVE_TO_COLOR_SENSOR = ('MOVE_TO_COLOR_SENSOR', 2.468)
  22. AWAIT_COLOR_READ = ('AWAIT_COLOR_READ', INFINITY)
  23. # NFC write
  24. MOVE_TO_NFC_WRITER = ('MOVE_TO_NFC_WRITER', 1.97)
  25. AWAIT_NFC_WRITE = ('AWAIT_NFC_WRITE', INFINITY)
  26. WORKPIECE_RECEIVED = ('WORKPIECE_RECEIVED', 0.0) # immediately instruct HBW to retrieve crate
  27. # Move workpiece to HBW via Crate Transporter
  28. MOVE_TO_CT = ('MOVE_TO_CT', 9.438)
  29. AWAIT_CRATE = ('AWAIT CRATE', INFINITY)
  30. PLACE_WP_IN_CRATE = ('PLACE_WP_IN_CRATE', 1.201)
  31. MOVE_TO_START_POS = ('MOVE_TO_START_POS', 8.255)
  32. class OrderRoutine(TimedPhaseEnum):
  33. """ Routine when an order is received: Steps in order, along with their timings """
  34. IDLE = ('IDLE', INFINITY)
  35. MOVE_TO_CT = ('MOVE_TO_CT', 10.305)
  36. AWAIT_CRATE = ('AWAIT_CRATE', INFINITY)
  37. RETURN_EMPTY_CRATE = ('RETURN_EMPTY_CRATE', 6.77) # take the workpiece and return the empty crate
  38. MOVE_TO_MPO = ('MOVE_TO_MPO', 2.434)
  39. DROP_WP_MPO = ('DROP_WP_MPO', 6.536)
  40. MOVE_ARM_UP = ('MOVE_ARM_UP', 3.375)
  41. RETRACT_ARM = ('RETRACT_ARM', 5.7185)
  42. MOVE_TO_START_POS = ('MOVE_TO_START_POS', 5.579)
  43. class OutputRoutine(TimedPhaseEnum):
  44. """ SLD has sorted a workpiece, and we need to bring it to the output """
  45. IDLE = ('IDLE', INFINITY)
  46. MOVE_TO_SLD = 'MOVE_TO_SLD' # timing below
  47. PICKUP_WP_SLD = ('PICKUP_WP_SLD', 5.7355) # lower arm for pickup
  48. AWAIT_WP_SLD = ('AWAIT_WP_SLD', INFINITY)
  49. RAISE_WP = ('RAISE_WP', 6.0185)
  50. MOVE_TO_NFC = 'MOVE_TO_NFC' # timing below
  51. AWAIT_NFC = ('AWAIT_NFC', INFINITY)
  52. AWAIT_DSO_CLEAR = ('AWAIT_DSO_CLEAR', INFINITY)
  53. RAISE_WP_OFF_NFC = ('RAISE_WP_OFF_NFC', 4.635)
  54. DROP_WP_DSO = ('DROP_WP_DSO', 2.7675) # move to DSO and drop workpiece
  55. RAISE_ARM = ('RAISE_ARM', 2.067)
  56. MOVE_TO_START_POS = ('MOVE_TO_START_POS', 3.435)
  57. # Timings for the output routine, depending on color
  58. SLD_Timings: dict = {
  59. "WHITE": {OutputRoutine.MOVE_TO_SLD: 3.102, OutputRoutine.MOVE_TO_NFC: 4.202},
  60. "RED": {OutputRoutine.MOVE_TO_SLD: 2.501, OutputRoutine.MOVE_TO_NFC: 4.102},
  61. "BLUE": {OutputRoutine.MOVE_TO_SLD: 2.043, OutputRoutine.MOVE_TO_NFC: 4.077}
  62. }
  63. @dataclass
  64. class VgrState:
  65. phase: InputRoutine | OrderRoutine = InputRoutine.IDLE
  66. delta_t: float = INFINITY
  67. workpiece: Workpiece | None = None
  68. crate: Crate | None = None # temporary to give back to crate transporter
  69. immediate_message: MqttMessage | None = None # a message to publish immediately, if not none (e.g. Status message)
  70. target: str = "" # target, used for status message (-> 'hbw' or 'mpo' or 'dso')
  71. target_sld_bay_color: str = "NONE" # Target bay color when picking up from SLD
  72. dso_clear: bool = True # whether the dso (output bay) is clear or not
  73. visual_update_pending: bool = False
  74. mqtt_msg_queue: list[MqttMessage] = field(default_factory=list) # queue for incoming mqtt messages received while busy, handle later
  75. def get_hbw_instruction_message(workpiece: Workpiece, code: int) -> MqttMessage:
  76. """ Creates a mqtt message for the hbw to make room for a workpiece """
  77. msg = MqttMessage()
  78. msg.topic = "fl/vgr/do"
  79. msg.payload = {
  80. "code": code,
  81. "ts": get_timestamp(),
  82. "workpiece": {
  83. "id": str(workpiece.id),
  84. "state": workpiece.state,
  85. "type": workpiece.color.value,
  86. }
  87. }
  88. return msg
  89. def get_order_response_msg(wp_type: str, state: str="ORDERED"):
  90. """ Creates a mqtt message for acknowledging an order has been placed """
  91. # wp_type is one of 'BLUE', 'RED', 'WHITE'
  92. msg = MqttMessage()
  93. msg.topic = "f/i/order"
  94. msg.payload = {
  95. "state": state,
  96. "ts": get_timestamp(),
  97. "type": wp_type,
  98. }
  99. return msg
  100. def is_active(phase: InputRoutine | OrderRoutine) -> bool:
  101. """Is the VGR phase an active state?"""
  102. idle_states = [
  103. InputRoutine.IDLE,
  104. InputRoutine.AWAIT_CRATE,
  105. OrderRoutine.IDLE,
  106. OrderRoutine.AWAIT_CRATE,
  107. OutputRoutine.IDLE
  108. ]
  109. return phase not in idle_states
  110. class VacuumGripper(AtomicDEVS):
  111. def __init__(self, name: str):
  112. super(VacuumGripper, self).__init__(name)
  113. # IN PORTS
  114. self.dsi_in = self.addInPort("dsi_in") # input bay input
  115. self.ct_in = self.addInPort("ct_in") # crate transporter input
  116. self.nfc_in = self.addInPort("nfc_in") # nfc reader input
  117. self.color_sensor_in = self.addInPort("color_sensor_in") # color sensor input
  118. self.sld_in: dict = {
  119. "RED": self.addInPort("sld_red_in"),
  120. "BLUE": self.addInPort("sld_blue_in"),
  121. "WHITE": self.addInPort("sld_white_in")
  122. }
  123. self.mqtt_in = self.addInPort("mqtt_in")
  124. # OUT PORTS
  125. self.dsi_out = self.addOutPort("dsi_out") # input bay output
  126. self.ct_out = self.addOutPort("ct_out") # crate transporter output
  127. self.nfc_out = self.addOutPort("nfc_out")
  128. self.color_sensor_out = self.addOutPort("color_sensor_out")
  129. self.mpo_out = self.addOutPort("mpo_out")
  130. self.sld_out: dict = {
  131. "RED": self.addOutPort("sld_red_out"),
  132. "BLUE": self.addOutPort("sld_blue_out"),
  133. "WHITE": self.addOutPort("sld_white_out")
  134. }
  135. self.dso_out = self.addOutPort("dso_out")
  136. self.mqtt_out = self.addOutPort("mqtt_out")
  137. self.state = VgrState()
  138. self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message on startup
  139. def is_idle(self) -> bool:
  140. """ Check if the vacuum gripper is in an idle state """
  141. return self.state.phase in [InputRoutine.IDLE, OrderRoutine.IDLE, OutputRoutine.IDLE]
  142. def _is_dsi_receive_valid(self, item) -> bool:
  143. """ Checks if the item we received from DSI, and the combination with the current state is valid"""
  144. if not self.state.phase == InputRoutine.AWAIT_WORKPIECE:
  145. logger.error(f"{type(self).__name__} '{self.name}' received item whilst NOT Expecting: {item}")
  146. return False
  147. if not isinstance(item, Workpiece):
  148. logger.error(f"{type(self).__name__} '{self.name}' received item {item} is not a Workpiece:")
  149. return False
  150. if self.state.workpiece is not None:
  151. logger.error(
  152. f"{type(self).__name__} '{self.name}' received new item whilst still holding one, DROPPING {item}:")
  153. return False
  154. return True
  155. def _is_ct_receive_valid(self, item) -> bool:
  156. """ Checks if the item we received from CrateTransporter, and the combination with the current state is valid"""
  157. if not (self.state.phase == InputRoutine.AWAIT_CRATE or self.state.phase == OrderRoutine.AWAIT_CRATE):
  158. logger.error(f"{type(self).__name__} '{self.name}' received crate whilst NOT WAITING FOR CRATE: {item}")
  159. return False
  160. if not isinstance(item, Crate):
  161. logger.error(f"{type(self).__name__} '{self.name}' received item {item} is not a Crate:")
  162. return False
  163. if self.state.crate is not None:
  164. logger.error(
  165. f"{type(self).__name__} '{self.name}' received new crate whilst still holding one, DROPPING {item}:")
  166. return False
  167. return True
  168. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  169. """ Get the status message for the vacuum gripper """
  170. # TODO: make codes more accurate to real factory
  171. phase = self.state.phase.next() if use_next_phase else self.state.phase
  172. active = 1 if is_active(phase) else 0
  173. code = 2 if (active == 1) else 1
  174. message = MqttMessage()
  175. message.topic = "f/i/state/vgr"
  176. message.payload = {
  177. "active": active,
  178. "code": code,
  179. "description": '',
  180. "station": 'vgr',
  181. "target": self.state.target,
  182. "ts": get_timestamp(),
  183. }
  184. return message
  185. def change_phase(self, new_phase: InputRoutine | OrderRoutine | OutputRoutine):
  186. """ Wrapper for changing the phase and associated timing, helps with logging """
  187. self.state.phase = new_phase
  188. self.state.delta_t = new_phase.timing
  189. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  190. self.state.visual_update_pending = True
  191. # Change target depending on the phase
  192. if isinstance(new_phase, InputRoutine):
  193. self.state.target = 'hbw'
  194. elif isinstance(new_phase, OrderRoutine):
  195. if new_phase == OrderRoutine.MOVE_TO_MPO:
  196. self.state.target = 'mpo'
  197. elif isinstance(new_phase, OutputRoutine):
  198. self.state.target = 'dso'
  199. def get_visual_update_data(self) -> MqttMessage:
  200. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  201. message = MqttMessage()
  202. message.topic = "visualization/vgr"
  203. duration = self.state.delta_t
  204. if duration == INFINITY: duration = None
  205. message.payload = {
  206. "action": self.state.phase.value,
  207. "duration": duration, # should be equal to the timing of the phase
  208. "crate": self.state.crate.to_dict() if self.state.crate else None,
  209. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  210. "target_sld": self.state.target_sld_bay_color, # extra info
  211. }
  212. return message
  213. def handle_mqtt_message(self, msg: MqttMessage) -> None:
  214. if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  215. self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message
  216. self.state.visual_update_pending = True # send visual update
  217. return self.state
  218. elif msg.topic == "simulation/ctrl/nfc":
  219. if self.state.workpiece and msg.payload["action"] == "id_update":
  220. self.state.workpiece.id = msg.payload["workpiece"]["id"]
  221. elif self.state.workpiece and msg.payload["action"] == "color_update":
  222. self.state.workpiece.color = WorkpieceColor[msg.payload["workpiece"]["type"]]
  223. # Queue some messages for later processing, if we are not IDLE
  224. elif not self.is_idle() and msg.topic in ["f/i/state/dsi", "f/o/order", "fl/sld/ack"]:
  225. self.state.mqtt_msg_queue.append(msg)
  226. elif msg.topic == "f/i/state/dsi":
  227. if msg.payload['active'] == 1:
  228. self.change_phase(InputRoutine.MOVE_TO_DSI)
  229. self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message
  230. elif msg.topic == "f/o/order":
  231. # reply with f/i/order
  232. self.state.immediate_message = get_order_response_msg(msg.payload["type"])
  233. elif msg.topic == "f/i/order" and msg.payload["state"] == 'ORDERED':
  234. # start self and hbw to handle the order
  235. wp = Workpiece("", WorkpieceColor(msg.payload["type"]), "RAW")
  236. self.state.immediate_message = get_hbw_instruction_message(wp, code=3)
  237. self.change_phase(OrderRoutine.MOVE_TO_CT)
  238. elif msg.topic == "fl/sld/ack" and msg.payload["code"] == 2:
  239. # SLD sorted, so go pick up the workpiece from there
  240. self.state.target_sld_bay_color = msg.payload["type"]
  241. self.change_phase(OutputRoutine.MOVE_TO_SLD)
  242. self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message
  243. self.state.delta_t = SLD_Timings[self.state.target_sld_bay_color][OutputRoutine.MOVE_TO_SLD]
  244. elif msg.topic == "f/i/state/dso":
  245. self.state.dso_clear = (msg.payload["active"] == 0) # check if dso is clear or not
  246. if self.state.phase == OutputRoutine.AWAIT_DSO_CLEAR and self.state.dso_clear:
  247. self.change_phase(self.state.phase.next())
  248. def extTransition(self, inputs):
  249. self.state.delta_t -= self.elapsed
  250. if self.dsi_in in inputs:
  251. item = inputs[self.dsi_in][0]
  252. logger.trace(f"{type(self).__name__} '{self.name}' received: {item}")
  253. if self._is_dsi_receive_valid(item):
  254. self.change_phase(InputRoutine.MOVE_WP_UP_FROM_DSI)
  255. self.state.workpiece = item
  256. elif self.ct_in in inputs:
  257. crate = inputs[self.ct_in][0]
  258. logger.trace(f"{type(self).__name__} '{self.name}' received: {crate}")
  259. if self._is_ct_receive_valid(crate) and self.state.phase == InputRoutine.AWAIT_CRATE:
  260. self.state.crate = crate
  261. # put workpiece in the new crate
  262. self.change_phase(InputRoutine.PLACE_WP_IN_CRATE)
  263. self.state.crate.workpiece = self.state.workpiece
  264. self.state.workpiece = None
  265. elif self._is_ct_receive_valid(crate) and self.state.phase == OrderRoutine.AWAIT_CRATE:
  266. # move workpiece from crate to internal
  267. self.state.workpiece = crate.workpiece
  268. crate.workpiece = None
  269. self.state.crate = crate
  270. # Go and return the empty crate
  271. self.change_phase(OrderRoutine.RETURN_EMPTY_CRATE)
  272. elif self.nfc_in in inputs:
  273. wp = inputs[self.nfc_in][0]
  274. logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
  275. if not (self.state.phase in [InputRoutine.AWAIT_NFC_READ, InputRoutine.AWAIT_NFC_WRITE, OutputRoutine.AWAIT_NFC]):
  276. logger.error(f"{type(self).__name__} '{self.name}' received NFC IN while not expecting: {wp}")
  277. return self.state
  278. self.state.workpiece = wp
  279. self.change_phase(self.state.phase.next()) # change to next step
  280. if self.state.phase == OutputRoutine.AWAIT_DSO_CLEAR and self.state.dso_clear:
  281. self.change_phase(self.state.phase.next()) # if dso clear, don't need to spend time waiting
  282. elif self.color_sensor_in in inputs:
  283. wp = inputs[self.color_sensor_in][0]
  284. logger.trace(f"{type(self).__name__} '{self.name}' received: {wp}")
  285. if self.state.phase != InputRoutine.AWAIT_COLOR_READ:
  286. logger.error(f"{type(self).__name__} '{self.name}' received COLOR IN while not expecting: {wp}")
  287. return self.state
  288. self.state.workpiece = wp
  289. self.change_phase(self.state.phase.next()) # change to next step
  290. elif self.mqtt_in in inputs:
  291. msg = inputs[self.mqtt_in][0]
  292. self.handle_mqtt_message(msg) # handle the mqtt message
  293. # if SLD input
  294. for color, sld_input in self.sld_in.items():
  295. if sld_input in inputs:
  296. if self.state.phase == OutputRoutine.AWAIT_WP_SLD:
  297. self.state.workpiece = inputs[sld_input][0]
  298. self.change_phase(OutputRoutine.RAISE_WP)
  299. self.state.immediate_message = self.get_status_message(use_next_phase=False) # send a status message
  300. else:
  301. logger.error(f"{type(self).__name__} '{self.name}' received SLD input while not expecting")
  302. return self.state
  303. return self.state # important, return state
  304. def timeAdvance(self):
  305. if self.state.visual_update_pending or self.state.immediate_message:
  306. return 0.0
  307. if self.state.mqtt_msg_queue and self.is_idle():
  308. return 0.0 # immediately handle the next mqtt message in the queue
  309. return self.state.delta_t
  310. def outputFnc(self):
  311. if self.state.visual_update_pending:
  312. return {self.mqtt_out: [self.get_visual_update_data()]}
  313. if self.state.immediate_message is not None:
  314. return {self.mqtt_out: [self.state.immediate_message]}
  315. elif self.state.phase == InputRoutine.MOVE_TO_DSI:
  316. return {self.dsi_out: [], self.mqtt_out: [self.get_status_message()]} # notify dsi that we ready to receive workpiece
  317. elif self.state.phase == InputRoutine.WORKPIECE_RECEIVED:
  318. hbw_command: MqttMessage = get_hbw_instruction_message(self.state.workpiece, code=1)
  319. return {self.mqtt_out: [hbw_command, self.get_status_message()]}
  320. elif self.state.phase == InputRoutine.PLACE_WP_IN_CRATE or self.state.phase == OrderRoutine.RETURN_EMPTY_CRATE:
  321. return {self.ct_out: [self.state.crate], self.mqtt_out: [self.get_status_message()]}
  322. elif self.state.phase == OrderRoutine.DROP_WP_MPO:
  323. return {self.mpo_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
  324. elif self.state.phase in [InputRoutine.MOVE_TO_NFC_READER, InputRoutine.MOVE_TO_NFC_WRITER, OutputRoutine.MOVE_TO_NFC]:
  325. return {self.nfc_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
  326. elif self.state.phase == InputRoutine.MOVE_TO_COLOR_SENSOR:
  327. return {self.color_sensor_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
  328. elif self.state.phase == OutputRoutine.PICKUP_WP_SLD:
  329. return {self.sld_out[self.state.target_sld_bay_color]: {}} # output request to get workpiece
  330. elif self.state.phase == OutputRoutine.DROP_WP_DSO:
  331. return {self.dso_out: [self.state.workpiece], self.mqtt_out: [self.get_status_message()]}
  332. return {self.mqtt_out: [self.get_status_message()]} # return status
  333. def intTransition(self):
  334. if self.state.visual_update_pending:
  335. self.state.visual_update_pending = False
  336. return self.state
  337. if self.state.immediate_message is not None:
  338. self.state.immediate_message = None # if requested before, we've handled it so undo
  339. return self.state
  340. if self.state.phase == InputRoutine.PLACE_WP_IN_CRATE or self.state.phase == OrderRoutine.RETURN_EMPTY_CRATE:
  341. self.state.crate = None # we have given our crate to the transporter
  342. elif self.state.phase in [OrderRoutine.DROP_WP_MPO, OutputRoutine.DROP_WP_DSO]:
  343. self.state.workpiece = None # we gave wp to MPO or DSO
  344. elif self.state.phase == InputRoutine.MOVE_TO_NFC_READER or self.state.phase == InputRoutine.MOVE_TO_NFC_WRITER or self.state.phase == InputRoutine.MOVE_TO_COLOR_SENSOR:
  345. self.state.workpiece = None # we've passed on our workpiece to a reader/writer, so remove it here
  346. elif self.state.phase.next() == OutputRoutine.MOVE_TO_NFC:
  347. self.change_phase(OutputRoutine.MOVE_TO_NFC)
  348. # Get timing depending on which place/color we got this from (-> distance changes)
  349. self.state.delta_t = SLD_Timings[self.state.target_sld_bay_color][OutputRoutine.MOVE_TO_NFC]
  350. return self.state
  351. elif (self.state.phase.next() == OutputRoutine.AWAIT_DSO_CLEAR) and self.state.dso_clear:
  352. self.change_phase(OutputRoutine.DROP_WP_DSO)
  353. return self.state # immediately proceed to dropping off workpiece at DSO
  354. elif self.state.mqtt_msg_queue and self.is_idle():
  355. # handle the next mqtt message in the queue
  356. msg = self.state.mqtt_msg_queue.pop(0)
  357. self.handle_mqtt_message(msg)
  358. return self.state
  359. self.change_phase(self.state.phase.next()) # move to the next phase
  360. return self.state