mpo_gripper.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from data_models.mqtt_message import MqttMessage
  2. from pypdevs.DEVS import AtomicDEVS
  3. from pypdevs.infinity import INFINITY
  4. from dataclasses import dataclass
  5. from loguru import logger
  6. from data_models.workpiece import Workpiece
  7. from utils.get_timestamp import get_timestamp
  8. from utils.timed_phase_enum import TimedPhaseEnum
  9. class MPOGrPhase(TimedPhaseEnum):
  10. """ Steps in order, along with their timings """
  11. IDLE = ('IDLE', INFINITY)
  12. MOVE_TO_OVEN = ('MOVE_TO_OVEN', 10.938)
  13. AWAIT_TABLE_TURN = ('AWAIT_TABLE_TURN', INFINITY) # wait for saw table to turn
  14. PICKUP_WP = ('PICKUP_WP', 2.201) # Pickup wp from the oven (move down and grip)
  15. AWAIT_WP = ('AWAIT_WP', INFINITY) # Await oven to give us the workpiece (should be instant)
  16. MOVE_TO_SAW_TABLE = ('MOVE_TO_SAW_TABLE', 12.122) # move to saw table and output workpiece
  17. @dataclass
  18. class MPOGrState:
  19. delta_t: float = INFINITY
  20. phase: MPOGrPhase = MPOGrPhase.IDLE
  21. workpiece: Workpiece | None = None
  22. status_requested: bool = True # if true, publish status
  23. visual_update_pending: bool = False
  24. class MPOGripper(AtomicDEVS):
  25. """ MPO vacuum gripper to transport workpieces from the oven to the table saw """
  26. def __init__(self, name: str):
  27. super(MPOGripper, self).__init__(name)
  28. self.oven_in = self.addInPort("oven_in") # input for oven giving workpiece
  29. self.oven_out = self.addOutPort("oven_out") # output for requesting wp from oven
  30. self.table_in = self.addInPort("table_in") # input for getting table saw turning confirmation
  31. self.table_out = self.addOutPort("table_out") # output to push wp to the table saw
  32. self.mqtt_in = self.addInPort("mqtt_in") # input for MQTT messages (e.g. visual refresh request)
  33. self.mqtt_out = self.addOutPort("mqtt_out")
  34. self.state = MPOGrState()
  35. def change_phase(self, new_phase: MPOGrPhase):
  36. """ Wrapper for changing the phase and time associated with it, helps with logging """
  37. self.state.phase = new_phase
  38. self.state.delta_t = new_phase.timing
  39. logger.trace(f"{type(self).__name__} '{self.name}' phase changed to {new_phase}")
  40. self.state.visual_update_pending = True
  41. def get_visual_update_data(self) -> MqttMessage:
  42. """ Get visual update data for the animation, contains the action taken and the duration of that action left """
  43. message = MqttMessage()
  44. message.topic = "visualization/mpo/gripper"
  45. duration = self.state.delta_t
  46. if duration == INFINITY: duration = None
  47. message.payload = {
  48. "action": self.state.phase.value,
  49. "duration": duration, # should be equal to the timing of the phase
  50. "workpiece": self.state.workpiece.to_dict() if self.state.workpiece else None,
  51. }
  52. return message
  53. def get_status_message(self, use_next_phase=True) -> MqttMessage:
  54. phase = self.state.phase.next() if use_next_phase else self.state.phase
  55. active = 0 if (phase == MPOGrPhase.IDLE) else 1
  56. code = 2 if active == 1 else 1
  57. message = MqttMessage()
  58. message.topic = "f/i/state/mpo"
  59. message.payload = {
  60. "active": active,
  61. "code": code,
  62. "description": '',
  63. "station": 'mpo',
  64. "ts": get_timestamp(),
  65. }
  66. return message
  67. def extTransition(self, inputs):
  68. self.state.delta_t -= self.elapsed
  69. if self.mqtt_in in inputs:
  70. msg = inputs[self.mqtt_in][0]
  71. if msg.topic == "simulation/ctrl/all" and msg.payload.get("action") == "refresh":
  72. self.state.status_requested = True
  73. self.state.visual_update_pending = True
  74. return self.state
  75. elif not (self.state.phase == MPOGrPhase.IDLE or self.state.phase == MPOGrPhase.AWAIT_TABLE_TURN or self.state.phase == MPOGrPhase.AWAIT_WP):
  76. logger.error(f"{type(self).__name__} '{self.name}' received input while not expecting: {inputs}")
  77. return self.state
  78. if self.oven_in in inputs:
  79. # 2 options; workpiece or instruction to pickup
  80. if self.state.phase == MPOGrPhase.IDLE:
  81. self.change_phase(MPOGrPhase.MOVE_TO_OVEN) # start moving to oven as requested
  82. elif self.state.phase == MPOGrPhase.AWAIT_WP:
  83. wp = inputs[self.oven_in][0]
  84. self.state.workpiece = wp # pickup new workpiece
  85. self.change_phase(MPOGrPhase.MOVE_TO_SAW_TABLE) # move workpiece to saw table
  86. else:
  87. logger.error(f"{type(self).__name__} '{self.name}' received oven input while not expecting: {inputs}")
  88. elif self.table_in in inputs:
  89. # Only option is if we are awaiting the saw table to turn to its rest position
  90. if self.state.phase == MPOGrPhase.AWAIT_TABLE_TURN:
  91. self.change_phase(MPOGrPhase.PICKUP_WP)
  92. else:
  93. logger.error(f"{type(self).__name__} '{self.name}' received table saw input while not expecting: {inputs}")
  94. return self.state # important, return state
  95. def timeAdvance(self):
  96. if self.state.visual_update_pending or self.state.status_requested:
  97. return 0.0
  98. return self.state.delta_t
  99. def outputFnc(self):
  100. if self.state.visual_update_pending:
  101. return {self.mqtt_out: [self.get_visual_update_data()]}
  102. if self.state.status_requested:
  103. if self.state.phase == MPOGrPhase.IDLE:
  104. return {} # do not output status when idle (other MPO parts likely not idle)
  105. return {self.mqtt_out: [self.get_status_message(use_next_phase=False)]}
  106. if self.state.phase == MPOGrPhase.MOVE_TO_OVEN:
  107. return {self.table_out: {}, self.mqtt_out: [self.get_status_message()]} # signal Table Saw to turn its table to face us
  108. elif self.state.phase == MPOGrPhase.PICKUP_WP:
  109. return {self.oven_out: {}, self.mqtt_out: [self.get_status_message()]} # signal oven that we are ready to pick up the workpiece
  110. elif self.state.phase == MPOGrPhase.MOVE_TO_SAW_TABLE:
  111. return {self.table_out: [self.state.workpiece]} # output workpiece to table, do not signal own status
  112. return {self.mqtt_out: [self.get_status_message()]}
  113. def intTransition(self):
  114. if self.state.visual_update_pending:
  115. self.state.visual_update_pending = False
  116. return self.state
  117. if self.state.status_requested:
  118. self.state.status_requested = False
  119. return self.state
  120. if self.state.phase == MPOGrPhase.MOVE_TO_SAW_TABLE:
  121. self.state.workpiece = None # remove the workpiece we just output to the table saw
  122. # Transition to next state
  123. self.change_phase(self.state.phase.next())
  124. return self.state